You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2012/01/31 00:02:47 UTC

[cassandra-node] 8 new revisions pushed by gdusbabek@gmail.com on 2012-01-30 23:02 GMT

8 new revisions:

Revision: d5c7427b3cc8
Author:   Christoph Tavan <de...@tavan.de>
Date:     Tue Jan 24 09:50:16 2012
Log:      Perform clean shutdown when PooledConnection.shutdown() is called
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=d5c7427b3cc8

Revision: 634d7d6bd642
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 07:49:43 2012
Log:      Add load test for connection pool and fix issue, where  
connections wer...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=634d7d6bd642

Revision: 4c54109897a6
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:24:50 2012
Log:      Emit drain event after the last execute() callback of a pool has  
been ...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4c54109897a6

Revision: 422553851a5f
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:25:26 2012
Log:      Simplify testPooledConnectionLoad and add  
testPooledConnectionShutdown...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=422553851a5f

Revision: 2184501cc648
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:27:35 2012
Log:      Cleanup test_driver.js (remove some console.log and trailing  
whitespac...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=2184501cc648

Revision: 1a6bfbb5136b
Author:   Christoph Tavan <de...@tavan.de>
Date:     Mon Jan 30 13:11:01 2012
Log:      Listen for the drain event only once for shutdown, also reset  
shutting...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1a6bfbb5136b

Revision: db75d6b18bb5
Author:   gdusbabek <gd...@gmail.com>
Date:     Mon Jan 30 14:58:56 2012
Log:      * PooledConnection.connect() establishes connections prior to  
execute(...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=db75d6b18bb5

Revision: 00b61ccdde0c
Author:   gdusbabek <gd...@gmail.com>
Date:     Mon Jan 30 14:59:17 2012
Log:      update CHANGES.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=00b61ccdde0c

==============================================================================
Revision: d5c7427b3cc8
Author:   Christoph Tavan <de...@tavan.de>
Date:     Tue Jan 24 09:50:16 2012
Log:      Perform clean shutdown when PooledConnection.shutdown() is called

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=d5c7427b3cc8

Modified:
  /lib/driver.js

=======================================
--- /lib/driver.js	Mon Jan 30 10:26:55 2012
+++ /lib/driver.js	Tue Jan 24 09:50:16 2012
@@ -331,10 +331,33 @@
    timeoutId = setTimeout(connectTimeout, this.timeout);
  };

-Connection.prototype.close = function() {
-  this.con.end();
-  this.con = null;
-  this.client = null;
+/**
+ * Closes the current connection
+ *
+ * `this.con` is a socket connection. For failed socket connections
+ * `this.con.end()` may not trigger a `close` event. So in the cases where  
we
+ * are experiencing problems with the connection, we can just `end()` it
+ * without waiting for the `close` event.
+ *
+ * Note that the callback is only called, if the `close` event is fired.  
Also
+ * we only wait for the `close` event if a callback is given.
+ *
+ * @param {function} callback
+ */
+Connection.prototype.close = function(callback) {
+  var self = this;
+  if (!callback) {
+    self.con.end();
+    self.con = null;
+    self.client = null;
+    return;
+  }
+  self.con.on('close', function(err) {
+    self.con = null;
+    self.client = null;
+    callback();
+  });
+  self.con.end();
  };

  /**
@@ -493,7 +516,13 @@
    this.use_bigints = config.use_bigints ? true : false;
    this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
    this.log_time = config.log_time || false;
-
+
+  // Number of currently running queries
+  this.running = 0;
+
+  // Shutdown mode
+  this.shuttingDown = false;
+
    // Construct a list of nodes from hosts in <host>:<port> form
    for (var i = 0; i < config.hosts.length; i++) {
      var hostSpec = config.hosts[i];
@@ -562,8 +591,23 @@
   *    UPDATE             : callback(err)
   *    DELETE             : callback(err)
   */
-PooledConnection.prototype.execute = function(query, args, callback) {
+PooledConnection.prototype.execute = function(query, args,  
executeCallback) {
    var self = this;
+
+  if (self.shuttingDown) {
+    executeCallback(new Error('Unable to execute query, connection pool is  
shutting down.'));
+    return;
+  }
+
+  self.running++;
+  var callback = function() {
+    self.running--;
+    if (self.running === 0) {
+      self.emit('drain');
+    }
+    executeCallback.apply(self, arguments);
+  };
+
    self._getNextCon(function(err, con) {
      if (err) {
        callback(err, null);
@@ -631,6 +675,7 @@
      } else if (c.unhealthyAt > 0) {
        callback();
      } else if (!c.connected) {
+      c.taken = true;
        c.connect(function(err) {
          if (c.connected) {
            con = c;
@@ -665,14 +710,40 @@
   * @param callback called when the pool is fully shutdown
   */
  PooledConnection.prototype.shutdown = function(callback) {
-  // todo: we need to be able to let pending execute()s finish and block  
executes from happening while shutting down.
-  this.connections.forEach(function(con) {
-    if (con.connected) {
-      con.close();
-    }
+  var self = this;
+
+  // Start shutdown mode, causes no new execute()'s to be accepted
+  if (self.shuttingDown) {
+    return;
+  }
+  self.shuttingDown = true;
+
+  callback = callback || function() {};
+
+  // Close all open connections as soon as the pool has drained
+  self.on('drain', function() {
+    self._closeConnections(callback);
    });
-  if (callback) {
-    callback();
+
+  // If no queries were running, emit the drain event immediately
+  if (self.running === 0) {
+    self.emit('drain');
    }
  };

+/**
+ * Close all connected connections.
+ *
+ * @param {function} closeCallback that is fired once all connections are  
closed
+ */
+PooledConnection.prototype._closeConnections = function(closeCallback) {
+  async.forEach(this.connections, function(con, cb) {
+    if (con.connected) {
+      con.close(cb);
+    } else {
+      cb(null);
+    }
+  }, function(err) {
+    closeCallback(err);
+  });
+};

==============================================================================
Revision: 634d7d6bd642
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 07:49:43 2012
Log:      Add load test for connection pool and fix issue, where  
connections were marked as 'taken' too early

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=634d7d6bd642

Modified:
  /lib/driver.js
  /test/test_driver.js

=======================================
--- /lib/driver.js	Tue Jan 24 09:50:16 2012
+++ /lib/driver.js	Thu Jan 26 07:49:43 2012
@@ -613,7 +613,6 @@
        callback(err, null);
      } else {
        try {
-        con.taken = true;
          con.execute(query, args, function(err, result) {
            con.taken = false;
            var recoverableError = null;
=======================================
--- /test/test_driver.js	Mon Jan 30 10:26:55 2012
+++ /test/test_driver.js	Thu Jan 26 07:49:43 2012
@@ -1008,3 +1008,46 @@
      }
    });
  };
+
+
+exports.testPooledConnectionLoad = function(test, assert) {
+  var hosts = ['127.0.0.1:19170'];
+  var conn = new PooledConnection({'hosts':  
hosts, 'keyspace': 'Keyspace1'});
+
+  var count = 3000;
+
+  async.waterfall([
+    function(cb) {
+      conn.execute('TRUNCATE CfUtf8', [], cb);
+    },
+    function(res, cb) {
+      var executes = [];
+      for (var i = 0; i < count; i++) {
+        executes.push(function(parallelCb) {
+          var uuid = new UUID().toString();
+          conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
+            'testCol',
+            'testVal',
+            uuid
+          ], parallelCb);
+        });
+      }
+      async.parallel(executes, function(err) {
+        assert.ifError(err);
+        cb();
+      });
+    },
+    function(cb) {
+      conn.execute('SELECT COUNT(*) FROM CfUtf8', [], cb);
+    },
+    function(res, cb) {
+      assert.equal(res[0].colHash.count, count);
+      cb();
+    },
+    conn.shutdown.bind(conn)
+  ],
+  function(err) {
+    assert.ifError(err);
+    test.finish();
+  });
+};

==============================================================================
Revision: 4c54109897a6
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:24:50 2012
Log:      Emit drain event after the last execute() callback of a pool has  
been called

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4c54109897a6

Modified:
  /lib/driver.js

=======================================
--- /lib/driver.js	Thu Jan 26 07:49:43 2012
+++ /lib/driver.js	Thu Jan 26 14:24:50 2012
@@ -602,10 +602,10 @@
    self.running++;
    var callback = function() {
      self.running--;
+    executeCallback.apply(self, arguments);
      if (self.running === 0) {
        self.emit('drain');
      }
-    executeCallback.apply(self, arguments);
    };

    self._getNextCon(function(err, con) {

==============================================================================
Revision: 422553851a5f
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:25:26 2012
Log:      Simplify testPooledConnectionLoad and add  
testPooledConnectionShutdown

- testPooledConnectionLoad now uses indexed keys instead of uuids which
   is more reproducible.
- testPooledConnectionShutdown checks whether all execute() callbacks
   are being fired *before* the callback which is passed to shutdown().

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=422553851a5f

Modified:
  /test/test_driver.js

=======================================
--- /test/test_driver.js	Thu Jan 26 07:49:43 2012
+++ /test/test_driver.js	Thu Jan 26 14:25:26 2012
@@ -1023,14 +1023,15 @@
      function(res, cb) {
        var executes = [];
        for (var i = 0; i < count; i++) {
-        executes.push(function(parallelCb) {
-          var uuid = new UUID().toString();
-          conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
-            'testCol',
-            'testVal',
-            uuid
-          ], parallelCb);
-        });
+        (function(index) {
+          executes.push(function(parallelCb) {
+            conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
+              'testCol',
+              'testVal',
+              'testKey'+index
+            ], parallelCb);
+          });
+        })(i);
        }
        async.parallel(executes, function(err) {
          assert.ifError(err);
@@ -1044,10 +1045,41 @@
        assert.equal(res[0].colHash.count, count);
        cb();
      },
-    conn.shutdown.bind(conn)
+    function(cb) {
+      conn.execute('TRUNCATE CfUtf8', [], cb);
+    },
+    function(res, cb) {
+      conn.shutdown(cb);
+    }
    ],
    function(err) {
      assert.ifError(err);
      test.finish();
    });
  };
+
+
+// We want to test if all executes of a pooled connection are finished  
before
+// the shutdown callback is called.
+exports.testPooledConnectionShutdown = function(test, assert) {
+  var hosts = ['127.0.0.1:19170'];
+  var conn = new PooledConnection({'hosts':  
hosts, 'keyspace': 'Keyspace1'});
+
+  var expected = 100;
+  var cbcount = 0;
+  var spy = function(err, res) {
+    assert.ifError(err);
+    cbcount++;
+  };
+
+  for (var i = 0; i < expected; i++) {
+    (function(index) {
+      conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?',  
['col', 'val', 'key'+index], spy);
+    })(i);
+  }
+  conn.shutdown(function(err) {
+    assert.ifError(err);
+    assert.equal(cbcount, expected);
+    test.finish();
+  });
+};

==============================================================================
Revision: 2184501cc648
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:27:35 2012
Log:      Cleanup test_driver.js (remove some console.log and trailing  
whitespaces)

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=2184501cc648

Modified:
  /test/test_driver.js

=======================================
--- /test/test_driver.js	Thu Jan 26 14:25:26 2012
+++ /test/test_driver.js	Thu Jan 26 14:27:35 2012
@@ -215,7 +215,6 @@
      function executeCountQuery(callback) {
        con.execute('SELECT COUNT(*) FROM CfLong', [], function(err, rows) {
          assert.ifError(err);
-        console.log(rows[0].cols);
          assert.strictEqual(rows[0].cols[0].value, 5);
          callback();
        });
@@ -453,7 +452,6 @@
            con.close();
            assert.strictEqual(rows.rowCount(), 1);
            var row = rows[0];
-          console.log(row);
            assert.strictEqual(row.key.toString('base64'),  
binaryParams[2].toString('base64'));
            assert.strictEqual(row.cols[0].name.toString('base64'),  
binaryParams[0].toString('base64'));
            assert.strictEqual(row.cols[0].value.toString('base64'),  
binaryParams[1].toString('base64'));

==============================================================================
Revision: 1a6bfbb5136b
Author:   Christoph Tavan <de...@tavan.de>
Date:     Mon Jan 30 13:11:01 2012
Log:      Listen for the drain event only once for shutdown, also reset  
shuttingDown state

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1a6bfbb5136b

Modified:
  /lib/driver.js

=======================================
--- /lib/driver.js	Thu Jan 26 14:24:50 2012
+++ /lib/driver.js	Mon Jan 30 13:11:01 2012
@@ -720,8 +720,11 @@
    callback = callback || function() {};

    // Close all open connections as soon as the pool has drained
-  self.on('drain', function() {
-    self._closeConnections(callback);
+  self.once('drain', function() {
+    self._closeConnections(function() {
+      self.shuttingDown = false;
+      callback();
+    });
    });

    // If no queries were running, emit the drain event immediately

==============================================================================
Revision: db75d6b18bb5
Author:   gdusbabek <gd...@gmail.com>
Date:     Mon Jan 30 14:58:56 2012
Log:      * PooledConnection.connect() establishes connections prior to  
execute()ing.
* Increase test timeout to 30s.
* Node version >= 0.6.7
* Increate connection setup timeout to 10s.

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=db75d6b18bb5

Modified:
  /lib/driver.js
  /package.json
  /test/test_driver.js

=======================================
--- /lib/driver.js	Mon Jan 30 13:11:01 2012
+++ /lib/driver.js	Mon Jan 30 14:58:56 2012
@@ -250,7 +250,7 @@
        }
      };

-    // 2) login.
+    // 2) learn.
      var learn = function(cb) {
        var timeoutId = setTimeout(function() {
          if (timeoutId) {
@@ -579,6 +579,33 @@
    return incrCount >= this.connections.length  
&& !this.connections[this.current_node].isHealthy();
  };

+
+/**
+ * Establishes connections to all hosts in the pool.
+ * @return callback expects err (if all the connect()s failed.
+ */
+PooledConnection.prototype.connect = function(callback) {
+  var self = this;
+  var errors = [];
+  async.forEach(self.connections, function doConnect(con, callback) {
+    con.connect(function(err) {
+      if (err) {
+        errors.push(err);
+      }
+      callback();
+    });
+  }, function() {
+    if (errors.length === self.connections.length) {
+      var error = new Error('There were errors connecting to every  
connection');
+      error._individualErrors = errors;
+      callback(error);
+    } else {
+      callback();
+    }
+  });
+};
+
+
  /**
   * executes any query
   * @param query any CQL statement with '?' placeholders.
@@ -652,12 +679,14 @@
    var tryStart = new Date().getTime();
    var con = null;
    var allBad = false;
+  var timedOut = false;
    var takens = [];
    async.whilst(function truthTest() {
      // should the timeout of getting a single connection be the sum of all  
connections?  Think of a scenario where the
      // timeout is N, but the first X nodes are unresponsive.  You still  
want to allow access to the subsequent good
      // nodes.
-    return !allBad && con === null && (new Date().getTime() - tryStart) <  
(self.timeout * self.connections.length);
+    timedOut = (new Date().getTime() - tryStart) >= (self.timeout *  
self.connections.length)
+    return !allBad && con === null && !timedOut;
    }, function tryConnect(callback) {
      var c = self.connections[self.current_node];
      allBad = self._incr();
@@ -693,7 +722,9 @@
        callback();
      }
    }, function whenDone(err) {
-    if (allBad && !err) {
+    if (timedOut && !err) {
+      err = new Error('Timed out waiting for connection ' + (self.timeout  
* self.connections.length));
+    } else if (allBad && !err) {
        err = new Error('All connections are unhealthy.');
      } else if (!con && !err) {
        err = new Error('connection was not set');
=======================================
--- /package.json	Thu Jan 26 14:40:08 2012
+++ /package.json	Mon Jan 30 14:58:56 2012
@@ -20,10 +20,10 @@
      "lib": "lib"
    },
    "scripts": {
-    "test": "whiskey --tests \"test/test_driver.js test/test_decoder.js  
test/test_uuid.js\" --dependencies test/dependencies.json --scope-leaks"
+    "test": "whiskey --tests \"test/test_driver.js test/test_decoder.js  
test/test_uuid.js\" --dependencies test/dependencies.json --scope-leaks  
--timeout 30000"
    },
    "engines": {
-    "node": ">= 0.4.0"
+    "node": ">= 0.6.7"
    },
    "dependencies": {
      "async": ">= 0.1.12",
=======================================
--- /test/test_driver.js	Thu Jan 26 14:27:35 2012
+++ /test/test_driver.js	Mon Jan 30 14:58:56 2012
@@ -1010,11 +1010,13 @@

  exports.testPooledConnectionLoad = function(test, assert) {
    var hosts = ['127.0.0.1:19170'];
-  var conn = new PooledConnection({'hosts':  
hosts, 'keyspace': 'Keyspace1'});
+  var conn = new PooledConnection({'hosts':  
hosts, 'keyspace': 'Keyspace1', 'timeout': 10000});

    var count = 3000;

    async.waterfall([
+    // establish connections prior to executing statements.
+    //conn.connect.bind(conn),
      function(cb) {
        conn.execute('TRUNCATE CfUtf8', [], cb);
      },

==============================================================================
Revision: 00b61ccdde0c
Author:   gdusbabek <gd...@gmail.com>
Date:     Mon Jan 30 14:59:17 2012
Log:      update CHANGES.

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=00b61ccdde0c

Modified:
  /CHANGES

=======================================
--- /CHANGES	Mon Jan 30 11:08:11 2012
+++ /CHANGES	Mon Jan 30 14:59:17 2012
@@ -1,5 +1,7 @@
  Changes with cassandra-client X.Y.Z

+- Put drain back.  
(http://code.google.com/a/apache-extras.org/p/cassandra-node/issues/detail?id=32)
+
  - Allow blob updates  
(http://code.google.com/a/apache-extras.org/p/cassandra-node/issues/detail?id=28)

  - Remove logmagic dependency, see README.md for how to capture logging  
events.