You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2011/11/17 21:39:18 UTC

[cassandra-node] 3 new revisions pushed by tomaz.muraus on 2011-11-17 20:38 GMT

3 new revisions:

Revision: 4b0d5e991fc9
Author:   Tomaz Muraus <to...@tomaz.me>
Date:     Thu Nov 17 12:23:47 2011
Log:      Apply connection timeout patch (issue #9).
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4b0d5e991fc9

Revision: af37fcbc2ca4
Author:   Tomaz Muraus <to...@tomaz.me>
Date:     Thu Nov 17 12:34:29 2011
Log:      Add CHANGES files.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=af37fcbc2ca4

Revision: 9d5c07af9da6
Author:   Tomaz Muraus <to...@tomaz.me>
Date:     Thu Nov 17 12:35:50 2011
Log:      Apply "Use async for login, learn, use and  
PooledConnection.execute" p...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9d5c07af9da6

==============================================================================
Revision: 4b0d5e991fc9
Author:   Tomaz Muraus <to...@tomaz.me>
Date:     Thu Nov 17 12:23:47 2011
Log:      Apply connection timeout patch (issue #9).

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4b0d5e991fc9

Modified:
  /lib/driver.js
  /test/test_driver.js

=======================================
--- /lib/driver.js	Fri Nov 11 08:57:42 2011
+++ /lib/driver.js	Thu Nov 17 12:23:47 2011
@@ -22,10 +22,12 @@
  var logTiming =  
require('logmagic').local('node-cassandra-client.driver.timing');

  var sys = require('sys');
+var constants = require('constants');
  var Buffer = require('buffer').Buffer;
  var EventEmitter = require('events').EventEmitter;

  var thrift = require('thrift');
+var async = require('async');
  var Cassandra = require('./gen-nodejs/Cassandra');
  var ttypes = require('./gen-nodejs/cassandra_types');

@@ -43,6 +45,8 @@
    message: 'null/undefined query parameter'
  };

+var DEFAULT_CONNECTION_TIMEOUT = 4000;
+
  /** converts object to a string using toString() method if it exists. */
  function stringify(x) {
    if (x.toString) {
@@ -184,12 +188,14 @@
   * @param config an object used to control the creation of new instances.
   */
  PooledConnection = module.exports.PooledConnection = function(config) {
+  config = config || {};
    this.nodes = [];
    this.holdFor = 10000;
    this.current_node = 0;
    this.use_bigints = config.use_bigints ? true : false;
+  this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
    this.log_time = config.log_time || false;
-
+
    // Construct a list of nodes from hosts in <host>:<port> form
    for (var i = 0; i < config.hosts.length; i++) {
      hostSpec = config.hosts[i];
@@ -239,6 +245,7 @@
                                     user: config.user,
                                     pass: config.pass,
                                     use_bigints: self.use_bigints,
+                                   timeout: self.timeout,
                                     log_time: self.log_time});

  	      conn.connect(function(err) {
@@ -329,14 +336,16 @@

  /**
   * @param options: valid parts are:
- *  user, pass, host, port, keyspace, use_bigints, log_time
+ *  user, pass, host, port, keyspace, use_bigints, timeout, log_time
   */
  Connection = module.exports.Connection = function(options) {
+  options = options || {};
    log.info('connecting ' + options.host + ':' + options.port);
    this.validators = {};
    this.con = thrift.createConnection(options.host, options.port);
    this.client = null;
    this.connectionInfo = options;
+  this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
  };


@@ -345,15 +354,23 @@
   * @param callback called when connection is successful or ultimately  
fails (err will be present).
   */
  Connection.prototype.connect = function(callback) {
-  var self = this;
+  var self = this,
+      timeoutId;
+
    this.con.on('error', function(err) {
+    clearTimeout(timeoutId);
      amendError(err);
      callback(err);
    });
+
    this.con.on('close', function() {
+    clearTimeout(timeoutId);
      log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + '  
is closed');
    });
+
    this.con.on('connect', function() {
+    clearTimeout(timeoutId);
+
      // preparing the conneciton is a 3-step process.

      // 1) login
@@ -402,7 +419,7 @@
          cb(err);
        });
      };
-
+
      // put it all together, checking for errors along the way.
      login(function(loginErr) {
        if (loginErr) {
@@ -427,11 +444,23 @@
          });
        }
      });
-
    });
-
+
+  function connectTimeout() {
+    var err = new Error('ETIMEDOUT, Operation timed out');
+    err.errno = constants.ETIMEDOUT;
+
+    try {
+      self.con.connection.destroy(err);
+    }
+    catch (e) {}
+
+    self.con = null;
+  }
+
    // kicks off the connection process.
    this.client = thrift.createClient(Cassandra, this.con);
+  timeoutId = setTimeout(connectTimeout, this.timeout);
  };

  Connection.prototype.close = function() {
=======================================
--- /test/test_driver.js	Fri Nov 11 08:48:27 2011
+++ /test/test_driver.js	Thu Nov 17 12:23:47 2011
@@ -769,13 +769,38 @@
  //  });
  //};

+
+exports.testPooledConnectionFailover = function(test, assert) {
+  var server = null;
+  var hosts =  
['google.com:8000', '127.0.0.1:6567', '127.0.0.2', '127.0.0.1:19170'];
+  var conn = new PooledConnection({'hosts':  
hosts, 'keyspace': 'Keyspace1', use_bigints: true, 'timeout': 5000});
+
+  async.series([
+    function executeQueries(callback) {
+      conn.execute('UPDATE CfUgly SET A=1 WHERE KEY=1', [], function(err) {
+        assert.ifError(err);
+        callback();
+      });
+    }
+  ],
+
+  function(err) {
+    if (server) {
+      server.close();
+    }
+
+    conn.shutdown();
+    test.finish();
+  });
+};
+
  exports.testPooledConnection = function(test, assert) {
    function bail(conn, err) {
      conn.shutdown();
      assert.ifError(err);
      test.finish();
    }
-
+
    //var hosts = ["127.0.0.2:9170", "127.0.0.1:9170"];
    var hosts = ["127.0.0.1:19170"];
    var conn = new PooledConnection({'hosts':  
hosts, 'keyspace': 'Keyspace1', use_bigints: true});

==============================================================================
Revision: af37fcbc2ca4
Author:   Tomaz Muraus <to...@tomaz.me>
Date:     Thu Nov 17 12:34:29 2011
Log:      Add CHANGES files.

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=af37fcbc2ca4

Added:
  /CHANGES

=======================================
--- /dev/null
+++ /CHANGES	Thu Nov 17 12:34:29 2011
@@ -0,0 +1,5 @@
+Changes with cassandra-client in development:
+
+- Set a timeout for connecting to the cassandra server. Default timeout
+  is 4000 ms, but a user can specify custom one by passing 'timeout'
+  option to the PooledConnection / Connection constructor.

==============================================================================
Revision: 9d5c07af9da6
Author:   Tomaz Muraus <to...@tomaz.me>
Date:     Thu Nov 17 12:35:50 2011
Log:      Apply "Use async for login, learn, use and  
PooledConnection.execute" patch
(issue #10).

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9d5c07af9da6

Modified:
  /lib/driver.js

=======================================
--- /lib/driver.js	Fri Nov 11 13:07:28 2011
+++ /lib/driver.js	Thu Nov 17 12:35:50 2011
@@ -22,6 +22,7 @@
  var logTiming =  
require('logmagic').local('node-cassandra-client.driver.timing');

  var sys = require('sys');
+var constants = require('constants');
  var Buffer = require('buffer').Buffer;
  var EventEmitter = require('events').EventEmitter;

@@ -44,6 +45,8 @@
    message: 'null/undefined query parameter'
  };

+var DEFAULT_CONNECTION_TIMEOUT = 4000;
+
  /** converts object to a string using toString() method if it exists. */
  function stringify(x) {
    if (x.toString) {
@@ -185,12 +188,14 @@
   * @param config an object used to control the creation of new instances.
   */
  PooledConnection = module.exports.PooledConnection = function(config) {
+  config = config || {};
    this.nodes = [];
    this.holdFor = 10000;
    this.current_node = 0;
    this.use_bigints = config.use_bigints ? true : false;
+  this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
    this.log_time = config.log_time || false;
-
+
    // Construct a list of nodes from hosts in <host>:<port> form
    for (var i = 0; i < config.hosts.length; i++) {
      hostSpec = config.hosts[i];
@@ -240,6 +245,7 @@
                                     user: config.user,
                                     pass: config.pass,
                                     use_bigints: self.use_bigints,
+                                   timeout: self.timeout,
                                     log_time: self.log_time});

  	      conn.connect(function(err) {
@@ -342,14 +348,16 @@

  /**
   * @param options: valid parts are:
- *  user, pass, host, port, keyspace, use_bigints, log_time
+ *  user, pass, host, port, keyspace, use_bigints, timeout, log_time
   */
  Connection = module.exports.Connection = function(options) {
+  options = options || {};
    log.info('connecting ' + options.host + ':' + options.port);
    this.validators = {};
    this.con = thrift.createConnection(options.host, options.port);
    this.client = null;
    this.connectionInfo = options;
+  this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
  };


@@ -358,15 +366,23 @@
   * @param callback called when connection is successful or ultimately  
fails (err will be present).
   */
  Connection.prototype.connect = function(callback) {
-  var self = this;
+  var self = this,
+      timeoutId;
+
    this.con.on('error', function(err) {
+    clearTimeout(timeoutId);
      amendError(err);
      callback(err);
    });
+
    this.con.on('close', function() {
+    clearTimeout(timeoutId);
      log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + '  
is closed');
    });
+
    this.con.on('connect', function() {
+    clearTimeout(timeoutId);
+
      // preparing the conneciton is a 3-step process.

      // 1) login
@@ -429,11 +445,23 @@

        callback(err);
      });
-
    });
-
+
+  function connectTimeout() {
+    var err = new Error('ETIMEDOUT, Operation timed out');
+    err.errno = constants.ETIMEDOUT;
+
+    try {
+      self.con.connection.destroy(err);
+    }
+    catch (e) {}
+
+    self.con = null;
+  }
+
    // kicks off the connection process.
    this.client = thrift.createClient(Cassandra, this.con);
+  timeoutId = setTimeout(connectTimeout, this.timeout);
  };

  Connection.prototype.close = function() {