You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2011/11/17 21:39:18 UTC
[cassandra-node] 3 new revisions pushed by tomaz.muraus on 2011-11-17
20:38 GMT
3 new revisions:
Revision: 4b0d5e991fc9
Author: Tomaz Muraus <to...@tomaz.me>
Date: Thu Nov 17 12:23:47 2011
Log: Apply connection timeout patch (issue #9).
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4b0d5e991fc9
Revision: af37fcbc2ca4
Author: Tomaz Muraus <to...@tomaz.me>
Date: Thu Nov 17 12:34:29 2011
Log: Add CHANGES files.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=af37fcbc2ca4
Revision: 9d5c07af9da6
Author: Tomaz Muraus <to...@tomaz.me>
Date: Thu Nov 17 12:35:50 2011
Log: Apply "Use async for login, learn, use and
PooledConnection.execute" p...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9d5c07af9da6
==============================================================================
Revision: 4b0d5e991fc9
Author: Tomaz Muraus <to...@tomaz.me>
Date: Thu Nov 17 12:23:47 2011
Log: Apply connection timeout patch (issue #9).
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4b0d5e991fc9
Modified:
/lib/driver.js
/test/test_driver.js
=======================================
--- /lib/driver.js Fri Nov 11 08:57:42 2011
+++ /lib/driver.js Thu Nov 17 12:23:47 2011
@@ -22,10 +22,12 @@
var logTiming =
require('logmagic').local('node-cassandra-client.driver.timing');
var sys = require('sys');
+var constants = require('constants');
var Buffer = require('buffer').Buffer;
var EventEmitter = require('events').EventEmitter;
var thrift = require('thrift');
+var async = require('async');
var Cassandra = require('./gen-nodejs/Cassandra');
var ttypes = require('./gen-nodejs/cassandra_types');
@@ -43,6 +45,8 @@
message: 'null/undefined query parameter'
};
+var DEFAULT_CONNECTION_TIMEOUT = 4000;
+
/** converts object to a string using toString() method if it exists. */
function stringify(x) {
if (x.toString) {
@@ -184,12 +188,14 @@
* @param config an object used to control the creation of new instances.
*/
PooledConnection = module.exports.PooledConnection = function(config) {
+ config = config || {};
this.nodes = [];
this.holdFor = 10000;
this.current_node = 0;
this.use_bigints = config.use_bigints ? true : false;
+ this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
this.log_time = config.log_time || false;
-
+
// Construct a list of nodes from hosts in <host>:<port> form
for (var i = 0; i < config.hosts.length; i++) {
hostSpec = config.hosts[i];
@@ -239,6 +245,7 @@
user: config.user,
pass: config.pass,
use_bigints: self.use_bigints,
+ timeout: self.timeout,
log_time: self.log_time});
conn.connect(function(err) {
@@ -329,14 +336,16 @@
/**
* @param options: valid parts are:
- * user, pass, host, port, keyspace, use_bigints, log_time
+ * user, pass, host, port, keyspace, use_bigints, timeout, log_time
*/
Connection = module.exports.Connection = function(options) {
+ options = options || {};
log.info('connecting ' + options.host + ':' + options.port);
this.validators = {};
this.con = thrift.createConnection(options.host, options.port);
this.client = null;
this.connectionInfo = options;
+ this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
};
@@ -345,15 +354,23 @@
* @param callback called when connection is successful or ultimately
fails (err will be present).
*/
Connection.prototype.connect = function(callback) {
- var self = this;
+ var self = this,
+ timeoutId;
+
this.con.on('error', function(err) {
+ clearTimeout(timeoutId);
amendError(err);
callback(err);
});
+
this.con.on('close', function() {
+ clearTimeout(timeoutId);
log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + '
is closed');
});
+
this.con.on('connect', function() {
+ clearTimeout(timeoutId);
+
// preparing the conneciton is a 3-step process.
// 1) login
@@ -402,7 +419,7 @@
cb(err);
});
};
-
+
// put it all together, checking for errors along the way.
login(function(loginErr) {
if (loginErr) {
@@ -427,11 +444,23 @@
});
}
});
-
});
-
+
+ function connectTimeout() {
+ var err = new Error('ETIMEDOUT, Operation timed out');
+ err.errno = constants.ETIMEDOUT;
+
+ try {
+ self.con.connection.destroy(err);
+ }
+ catch (e) {}
+
+ self.con = null;
+ }
+
// kicks off the connection process.
this.client = thrift.createClient(Cassandra, this.con);
+ timeoutId = setTimeout(connectTimeout, this.timeout);
};
Connection.prototype.close = function() {
=======================================
--- /test/test_driver.js Fri Nov 11 08:48:27 2011
+++ /test/test_driver.js Thu Nov 17 12:23:47 2011
@@ -769,13 +769,38 @@
// });
//};
+
+exports.testPooledConnectionFailover = function(test, assert) {
+ var server = null;
+ var hosts =
['google.com:8000', '127.0.0.1:6567', '127.0.0.2', '127.0.0.1:19170'];
+ var conn = new PooledConnection({'hosts':
hosts, 'keyspace': 'Keyspace1', use_bigints: true, 'timeout': 5000});
+
+ async.series([
+ function executeQueries(callback) {
+ conn.execute('UPDATE CfUgly SET A=1 WHERE KEY=1', [], function(err) {
+ assert.ifError(err);
+ callback();
+ });
+ }
+ ],
+
+ function(err) {
+ if (server) {
+ server.close();
+ }
+
+ conn.shutdown();
+ test.finish();
+ });
+};
+
exports.testPooledConnection = function(test, assert) {
function bail(conn, err) {
conn.shutdown();
assert.ifError(err);
test.finish();
}
-
+
//var hosts = ["127.0.0.2:9170", "127.0.0.1:9170"];
var hosts = ["127.0.0.1:19170"];
var conn = new PooledConnection({'hosts':
hosts, 'keyspace': 'Keyspace1', use_bigints: true});
==============================================================================
Revision: af37fcbc2ca4
Author: Tomaz Muraus <to...@tomaz.me>
Date: Thu Nov 17 12:34:29 2011
Log: Add CHANGES files.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=af37fcbc2ca4
Added:
/CHANGES
=======================================
--- /dev/null
+++ /CHANGES Thu Nov 17 12:34:29 2011
@@ -0,0 +1,5 @@
+Changes with cassandra-client in development:
+
+- Set a timeout for connecting to the cassandra server. Default timeout
+ is 4000 ms, but a user can specify custom one by passing 'timeout'
+ option to the PooledConnection / Connection constructor.
==============================================================================
Revision: 9d5c07af9da6
Author: Tomaz Muraus <to...@tomaz.me>
Date: Thu Nov 17 12:35:50 2011
Log: Apply "Use async for login, learn, use and
PooledConnection.execute" patch
(issue #10).
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9d5c07af9da6
Modified:
/lib/driver.js
=======================================
--- /lib/driver.js Fri Nov 11 13:07:28 2011
+++ /lib/driver.js Thu Nov 17 12:35:50 2011
@@ -22,6 +22,7 @@
var logTiming =
require('logmagic').local('node-cassandra-client.driver.timing');
var sys = require('sys');
+var constants = require('constants');
var Buffer = require('buffer').Buffer;
var EventEmitter = require('events').EventEmitter;
@@ -44,6 +45,8 @@
message: 'null/undefined query parameter'
};
+var DEFAULT_CONNECTION_TIMEOUT = 4000;
+
/** converts object to a string using toString() method if it exists. */
function stringify(x) {
if (x.toString) {
@@ -185,12 +188,14 @@
* @param config an object used to control the creation of new instances.
*/
PooledConnection = module.exports.PooledConnection = function(config) {
+ config = config || {};
this.nodes = [];
this.holdFor = 10000;
this.current_node = 0;
this.use_bigints = config.use_bigints ? true : false;
+ this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
this.log_time = config.log_time || false;
-
+
// Construct a list of nodes from hosts in <host>:<port> form
for (var i = 0; i < config.hosts.length; i++) {
hostSpec = config.hosts[i];
@@ -240,6 +245,7 @@
user: config.user,
pass: config.pass,
use_bigints: self.use_bigints,
+ timeout: self.timeout,
log_time: self.log_time});
conn.connect(function(err) {
@@ -342,14 +348,16 @@
/**
* @param options: valid parts are:
- * user, pass, host, port, keyspace, use_bigints, log_time
+ * user, pass, host, port, keyspace, use_bigints, timeout, log_time
*/
Connection = module.exports.Connection = function(options) {
+ options = options || {};
log.info('connecting ' + options.host + ':' + options.port);
this.validators = {};
this.con = thrift.createConnection(options.host, options.port);
this.client = null;
this.connectionInfo = options;
+ this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
};
@@ -358,15 +366,23 @@
* @param callback called when connection is successful or ultimately
fails (err will be present).
*/
Connection.prototype.connect = function(callback) {
- var self = this;
+ var self = this,
+ timeoutId;
+
this.con.on('error', function(err) {
+ clearTimeout(timeoutId);
amendError(err);
callback(err);
});
+
this.con.on('close', function() {
+ clearTimeout(timeoutId);
log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + '
is closed');
});
+
this.con.on('connect', function() {
+ clearTimeout(timeoutId);
+
// preparing the conneciton is a 3-step process.
// 1) login
@@ -429,11 +445,23 @@
callback(err);
});
-
});
-
+
+ function connectTimeout() {
+ var err = new Error('ETIMEDOUT, Operation timed out');
+ err.errno = constants.ETIMEDOUT;
+
+ try {
+ self.con.connection.destroy(err);
+ }
+ catch (e) {}
+
+ self.con = null;
+ }
+
// kicks off the connection process.
this.client = thrift.createClient(Cassandra, this.con);
+ timeoutId = setTimeout(connectTimeout, this.timeout);
};
Connection.prototype.close = function() {