You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2012/01/31 00:02:47 UTC
[cassandra-node] 8 new revisions pushed by gdusbabek@gmail.com on
2012-01-30 23:02 GMT
8 new revisions:
Revision: d5c7427b3cc8
Author: Christoph Tavan <de...@tavan.de>
Date: Tue Jan 24 09:50:16 2012
Log: Perform clean shutdown when PooledConnection.shutdown() is called
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=d5c7427b3cc8
Revision: 634d7d6bd642
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 07:49:43 2012
Log: Add load test for connection pool and fix issue, where
connections wer...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=634d7d6bd642
Revision: 4c54109897a6
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 14:24:50 2012
Log: Emit drain event after the last execute() callback of a pool has
been ...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4c54109897a6
Revision: 422553851a5f
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 14:25:26 2012
Log: Simplify testPooledConnectionLoad and add
testPooledConnectionShutdown...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=422553851a5f
Revision: 2184501cc648
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 14:27:35 2012
Log: Cleanup test_driver.js (remove some console.log and trailing
whitespac...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=2184501cc648
Revision: 1a6bfbb5136b
Author: Christoph Tavan <de...@tavan.de>
Date: Mon Jan 30 13:11:01 2012
Log: Listen for the drain event only once for shutdown, also reset
shutting...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1a6bfbb5136b
Revision: db75d6b18bb5
Author: gdusbabek <gd...@gmail.com>
Date: Mon Jan 30 14:58:56 2012
Log: * PooledConnection.connect() establishes connections prior to
execute(...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=db75d6b18bb5
Revision: 00b61ccdde0c
Author: gdusbabek <gd...@gmail.com>
Date: Mon Jan 30 14:59:17 2012
Log: update CHANGES.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=00b61ccdde0c
==============================================================================
Revision: d5c7427b3cc8
Author: Christoph Tavan <de...@tavan.de>
Date: Tue Jan 24 09:50:16 2012
Log: Perform clean shutdown when PooledConnection.shutdown() is called
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=d5c7427b3cc8
Modified:
/lib/driver.js
=======================================
--- /lib/driver.js Mon Jan 30 10:26:55 2012
+++ /lib/driver.js Tue Jan 24 09:50:16 2012
@@ -331,10 +331,33 @@
timeoutId = setTimeout(connectTimeout, this.timeout);
};
-Connection.prototype.close = function() {
- this.con.end();
- this.con = null;
- this.client = null;
+/**
+ * Closes the current connection
+ *
+ * `this.con` is a socket connection. For failed socket connections
+ * `this.con.end()` may not trigger a `close` event. So in the cases where
we
+ * are experiencing problems with the connection, we can just `end()` it
+ * without waiting for the `close` event.
+ *
+ * Note that the callback is only called, if the `close` event is fired.
Also
+ * we only wait for the `close` event if a callback is given.
+ *
+ * @param {function} callback
+ */
+Connection.prototype.close = function(callback) {
+ var self = this;
+ if (!callback) {
+ self.con.end();
+ self.con = null;
+ self.client = null;
+ return;
+ }
+ self.con.on('close', function(err) {
+ self.con = null;
+ self.client = null;
+ callback();
+ });
+ self.con.end();
};
/**
@@ -493,7 +516,13 @@
this.use_bigints = config.use_bigints ? true : false;
this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
this.log_time = config.log_time || false;
-
+
+ // Number of currently running queries
+ this.running = 0;
+
+ // Shutdown mode
+ this.shuttingDown = false;
+
// Construct a list of nodes from hosts in <host>:<port> form
for (var i = 0; i < config.hosts.length; i++) {
var hostSpec = config.hosts[i];
@@ -562,8 +591,23 @@
* UPDATE : callback(err)
* DELETE : callback(err)
*/
-PooledConnection.prototype.execute = function(query, args, callback) {
+PooledConnection.prototype.execute = function(query, args,
executeCallback) {
var self = this;
+
+ if (self.shuttingDown) {
+ executeCallback(new Error('Unable to execute query, connection pool is
shutting down.'));
+ return;
+ }
+
+ self.running++;
+ var callback = function() {
+ self.running--;
+ if (self.running === 0) {
+ self.emit('drain');
+ }
+ executeCallback.apply(self, arguments);
+ };
+
self._getNextCon(function(err, con) {
if (err) {
callback(err, null);
@@ -631,6 +675,7 @@
} else if (c.unhealthyAt > 0) {
callback();
} else if (!c.connected) {
+ c.taken = true;
c.connect(function(err) {
if (c.connected) {
con = c;
@@ -665,14 +710,40 @@
* @param callback called when the pool is fully shutdown
*/
PooledConnection.prototype.shutdown = function(callback) {
- // todo: we need to be able to let pending execute()s finish and block
executes from happening while shutting down.
- this.connections.forEach(function(con) {
- if (con.connected) {
- con.close();
- }
+ var self = this;
+
+ // Start shutdown mode, causes no new execute()'s to be accepted
+ if (self.shuttingDown) {
+ return;
+ }
+ self.shuttingDown = true;
+
+ callback = callback || function() {};
+
+ // Close all open connections as soon as the pool has drained
+ self.on('drain', function() {
+ self._closeConnections(callback);
});
- if (callback) {
- callback();
+
+ // If no queries were running, emit the drain event immediately
+ if (self.running === 0) {
+ self.emit('drain');
}
};
+/**
+ * Close all connected connections.
+ *
+ * @param {function} closeCallback that is fired once all connections are
closed
+ */
+PooledConnection.prototype._closeConnections = function(closeCallback) {
+ async.forEach(this.connections, function(con, cb) {
+ if (con.connected) {
+ con.close(cb);
+ } else {
+ cb(null);
+ }
+ }, function(err) {
+ closeCallback(err);
+ });
+};
==============================================================================
Revision: 634d7d6bd642
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 07:49:43 2012
Log: Add load test for connection pool and fix issue, where
connections were marked as 'taken' too early
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=634d7d6bd642
Modified:
/lib/driver.js
/test/test_driver.js
=======================================
--- /lib/driver.js Tue Jan 24 09:50:16 2012
+++ /lib/driver.js Thu Jan 26 07:49:43 2012
@@ -613,7 +613,6 @@
callback(err, null);
} else {
try {
- con.taken = true;
con.execute(query, args, function(err, result) {
con.taken = false;
var recoverableError = null;
=======================================
--- /test/test_driver.js Mon Jan 30 10:26:55 2012
+++ /test/test_driver.js Thu Jan 26 07:49:43 2012
@@ -1008,3 +1008,46 @@
}
});
};
+
+
+exports.testPooledConnectionLoad = function(test, assert) {
+ var hosts = ['127.0.0.1:19170'];
+ var conn = new PooledConnection({'hosts':
hosts, 'keyspace': 'Keyspace1'});
+
+ var count = 3000;
+
+ async.waterfall([
+ function(cb) {
+ conn.execute('TRUNCATE CfUtf8', [], cb);
+ },
+ function(res, cb) {
+ var executes = [];
+ for (var i = 0; i < count; i++) {
+ executes.push(function(parallelCb) {
+ var uuid = new UUID().toString();
+ conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
+ 'testCol',
+ 'testVal',
+ uuid
+ ], parallelCb);
+ });
+ }
+ async.parallel(executes, function(err) {
+ assert.ifError(err);
+ cb();
+ });
+ },
+ function(cb) {
+ conn.execute('SELECT COUNT(*) FROM CfUtf8', [], cb);
+ },
+ function(res, cb) {
+ assert.equal(res[0].colHash.count, count);
+ cb();
+ },
+ conn.shutdown.bind(conn)
+ ],
+ function(err) {
+ assert.ifError(err);
+ test.finish();
+ });
+};
==============================================================================
Revision: 4c54109897a6
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 14:24:50 2012
Log: Emit drain event after the last execute() callback of a pool has
been called
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4c54109897a6
Modified:
/lib/driver.js
=======================================
--- /lib/driver.js Thu Jan 26 07:49:43 2012
+++ /lib/driver.js Thu Jan 26 14:24:50 2012
@@ -602,10 +602,10 @@
self.running++;
var callback = function() {
self.running--;
+ executeCallback.apply(self, arguments);
if (self.running === 0) {
self.emit('drain');
}
- executeCallback.apply(self, arguments);
};
self._getNextCon(function(err, con) {
==============================================================================
Revision: 422553851a5f
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 14:25:26 2012
Log: Simplify testPooledConnectionLoad and add
testPooledConnectionShutdown
- testPooledConnectionLoad now uses indexed keys instead of uuids which
is more reproducible.
- testPooledConnectionShutdown checks whether all execute() callbacks
are being fired *before* the callback which is passed to shutdown().
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=422553851a5f
Modified:
/test/test_driver.js
=======================================
--- /test/test_driver.js Thu Jan 26 07:49:43 2012
+++ /test/test_driver.js Thu Jan 26 14:25:26 2012
@@ -1023,14 +1023,15 @@
function(res, cb) {
var executes = [];
for (var i = 0; i < count; i++) {
- executes.push(function(parallelCb) {
- var uuid = new UUID().toString();
- conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
- 'testCol',
- 'testVal',
- uuid
- ], parallelCb);
- });
+ (function(index) {
+ executes.push(function(parallelCb) {
+ conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
+ 'testCol',
+ 'testVal',
+ 'testKey'+index
+ ], parallelCb);
+ });
+ })(i);
}
async.parallel(executes, function(err) {
assert.ifError(err);
@@ -1044,10 +1045,41 @@
assert.equal(res[0].colHash.count, count);
cb();
},
- conn.shutdown.bind(conn)
+ function(cb) {
+ conn.execute('TRUNCATE CfUtf8', [], cb);
+ },
+ function(res, cb) {
+ conn.shutdown(cb);
+ }
],
function(err) {
assert.ifError(err);
test.finish();
});
};
+
+
+// We want to test if all executes of a pooled connection are finished
before
+// the shutdown callback is called.
+exports.testPooledConnectionShutdown = function(test, assert) {
+ var hosts = ['127.0.0.1:19170'];
+ var conn = new PooledConnection({'hosts':
hosts, 'keyspace': 'Keyspace1'});
+
+ var expected = 100;
+ var cbcount = 0;
+ var spy = function(err, res) {
+ assert.ifError(err);
+ cbcount++;
+ };
+
+ for (var i = 0; i < expected; i++) {
+ (function(index) {
+ conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?',
['col', 'val', 'key'+index], spy);
+ })(i);
+ }
+ conn.shutdown(function(err) {
+ assert.ifError(err);
+ assert.equal(cbcount, expected);
+ test.finish();
+ });
+};
==============================================================================
Revision: 2184501cc648
Author: Christoph Tavan <de...@tavan.de>
Date: Thu Jan 26 14:27:35 2012
Log: Cleanup test_driver.js (remove some console.log and trailing
whitespaces)
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=2184501cc648
Modified:
/test/test_driver.js
=======================================
--- /test/test_driver.js Thu Jan 26 14:25:26 2012
+++ /test/test_driver.js Thu Jan 26 14:27:35 2012
@@ -215,7 +215,6 @@
function executeCountQuery(callback) {
con.execute('SELECT COUNT(*) FROM CfLong', [], function(err, rows) {
assert.ifError(err);
- console.log(rows[0].cols);
assert.strictEqual(rows[0].cols[0].value, 5);
callback();
});
@@ -453,7 +452,6 @@
con.close();
assert.strictEqual(rows.rowCount(), 1);
var row = rows[0];
- console.log(row);
assert.strictEqual(row.key.toString('base64'),
binaryParams[2].toString('base64'));
assert.strictEqual(row.cols[0].name.toString('base64'),
binaryParams[0].toString('base64'));
assert.strictEqual(row.cols[0].value.toString('base64'),
binaryParams[1].toString('base64'));
==============================================================================
Revision: 1a6bfbb5136b
Author: Christoph Tavan <de...@tavan.de>
Date: Mon Jan 30 13:11:01 2012
Log: Listen for the drain event only once for shutdown, also reset
shuttingDown state
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1a6bfbb5136b
Modified:
/lib/driver.js
=======================================
--- /lib/driver.js Thu Jan 26 14:24:50 2012
+++ /lib/driver.js Mon Jan 30 13:11:01 2012
@@ -720,8 +720,11 @@
callback = callback || function() {};
// Close all open connections as soon as the pool has drained
- self.on('drain', function() {
- self._closeConnections(callback);
+ self.once('drain', function() {
+ self._closeConnections(function() {
+ self.shuttingDown = false;
+ callback();
+ });
});
// If no queries were running, emit the drain event immediately
==============================================================================
Revision: db75d6b18bb5
Author: gdusbabek <gd...@gmail.com>
Date: Mon Jan 30 14:58:56 2012
Log: * PooledConnection.connect() establishes connections prior to
execute()ing.
* Increase test timeout to 30s.
* Node version >= 0.6.7
* Increate connection setup timeout to 10s.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=db75d6b18bb5
Modified:
/lib/driver.js
/package.json
/test/test_driver.js
=======================================
--- /lib/driver.js Mon Jan 30 13:11:01 2012
+++ /lib/driver.js Mon Jan 30 14:58:56 2012
@@ -250,7 +250,7 @@
}
};
- // 2) login.
+ // 2) learn.
var learn = function(cb) {
var timeoutId = setTimeout(function() {
if (timeoutId) {
@@ -579,6 +579,33 @@
return incrCount >= this.connections.length
&& !this.connections[this.current_node].isHealthy();
};
+
+/**
+ * Establishes connections to all hosts in the pool.
+ * @return callback expects err (if all the connect()s failed.
+ */
+PooledConnection.prototype.connect = function(callback) {
+ var self = this;
+ var errors = [];
+ async.forEach(self.connections, function doConnect(con, callback) {
+ con.connect(function(err) {
+ if (err) {
+ errors.push(err);
+ }
+ callback();
+ });
+ }, function() {
+ if (errors.length === self.connections.length) {
+ var error = new Error('There were errors connecting to every
connection');
+ error._individualErrors = errors;
+ callback(error);
+ } else {
+ callback();
+ }
+ });
+};
+
+
/**
* executes any query
* @param query any CQL statement with '?' placeholders.
@@ -652,12 +679,14 @@
var tryStart = new Date().getTime();
var con = null;
var allBad = false;
+ var timedOut = false;
var takens = [];
async.whilst(function truthTest() {
// should the timeout of getting a single connection be the sum of all
connections? Think of a scenario where the
// timeout is N, but the first X nodes are unresponsive. You still
want to allow access to the subsequent good
// nodes.
- return !allBad && con === null && (new Date().getTime() - tryStart) <
(self.timeout * self.connections.length);
+ timedOut = (new Date().getTime() - tryStart) >= (self.timeout *
self.connections.length)
+ return !allBad && con === null && !timedOut;
}, function tryConnect(callback) {
var c = self.connections[self.current_node];
allBad = self._incr();
@@ -693,7 +722,9 @@
callback();
}
}, function whenDone(err) {
- if (allBad && !err) {
+ if (timedOut && !err) {
+ err = new Error('Timed out waiting for connection ' + (self.timeout
* self.connections.length));
+ } else if (allBad && !err) {
err = new Error('All connections are unhealthy.');
} else if (!con && !err) {
err = new Error('connection was not set');
=======================================
--- /package.json Thu Jan 26 14:40:08 2012
+++ /package.json Mon Jan 30 14:58:56 2012
@@ -20,10 +20,10 @@
"lib": "lib"
},
"scripts": {
- "test": "whiskey --tests \"test/test_driver.js test/test_decoder.js
test/test_uuid.js\" --dependencies test/dependencies.json --scope-leaks"
+ "test": "whiskey --tests \"test/test_driver.js test/test_decoder.js
test/test_uuid.js\" --dependencies test/dependencies.json --scope-leaks
--timeout 30000"
},
"engines": {
- "node": ">= 0.4.0"
+ "node": ">= 0.6.7"
},
"dependencies": {
"async": ">= 0.1.12",
=======================================
--- /test/test_driver.js Thu Jan 26 14:27:35 2012
+++ /test/test_driver.js Mon Jan 30 14:58:56 2012
@@ -1010,11 +1010,13 @@
exports.testPooledConnectionLoad = function(test, assert) {
var hosts = ['127.0.0.1:19170'];
- var conn = new PooledConnection({'hosts':
hosts, 'keyspace': 'Keyspace1'});
+ var conn = new PooledConnection({'hosts':
hosts, 'keyspace': 'Keyspace1', 'timeout': 10000});
var count = 3000;
async.waterfall([
+ // establish connections prior to executing statements.
+ //conn.connect.bind(conn),
function(cb) {
conn.execute('TRUNCATE CfUtf8', [], cb);
},
==============================================================================
Revision: 00b61ccdde0c
Author: gdusbabek <gd...@gmail.com>
Date: Mon Jan 30 14:59:17 2012
Log: update CHANGES.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=00b61ccdde0c
Modified:
/CHANGES
=======================================
--- /CHANGES Mon Jan 30 11:08:11 2012
+++ /CHANGES Mon Jan 30 14:59:17 2012
@@ -1,5 +1,7 @@
Changes with cassandra-client X.Y.Z
+- Put drain back.
(http://code.google.com/a/apache-extras.org/p/cassandra-node/issues/detail?id=32)
+
- Allow blob updates
(http://code.google.com/a/apache-extras.org/p/cassandra-node/issues/detail?id=28)
- Remove logmagic dependency, see README.md for how to capture logging
events.