You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2012/01/23 18:30:20 UTC
[cassandra-node] push by gdusbabek@gmail.com - fix merge conflict on
2012-01-23 17:29 GMT
Revision: b94f6bdb675c
Author: Gary Dusbabek <gd...@gmail.com>
Date: Mon Jan 23 09:29:06 2012
Log: fix merge conflict
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=b94f6bdb675c
Modified:
/lib/driver.js
=======================================
--- /lib/driver.js Sat Jan 21 15:42:46 2012
+++ /lib/driver.js Mon Jan 23 09:29:06 2012
@@ -21,6 +21,7 @@
var logCql = require('logmagic').local('node-cassandra-client.driver.cql');
var logTiming =
require('logmagic').local('node-cassandra-client.driver.timing');
+var util = require('util');
var constants = require('constants');
var Buffer = require('buffer').Buffer;
var EventEmitter = require('events').EventEmitter;
@@ -181,193 +182,6 @@
return this._colCount;
};
-/**
- * Perform queries against a pool of open connections.
- *
- * Accepts a single argument of an object used to configure the new
PooledConnection
- * instance. The config object supports the following attributes:
- *
- * hosts : List of strings in host:port format.
- * keyspace : Keyspace name.
- * user : User for authentication (optional).
- * pass : Password for authentication (optional).
- * maxSize : Maximum number of connection to pool (optional).
- * idleMillis : Idle connection timeout in milliseconds (optional).
- *
- * Example:
- *
- * var pool = new PooledConnection({
- * hosts : ['host1:9160', 'host2:9170', 'host3', 'host4'],
- * keyspace : 'database',
- * user : 'mary',
- * pass : 'qwerty',
- * maxSize : 25,
- * idleMillis : 30000
- * });
- *
- * @param config an object used to control the creation of new instances.
- */
-var PooledConnection = module.exports.PooledConnection = function(config) {
- config = config || {};
- this.nodes = [];
- this.holdFor = 10000;
- this.current_node = 0;
- this.use_bigints = config.use_bigints ? true : false;
- this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
- this.log_time = config.log_time || false;
-
- // Construct a list of nodes from hosts in <host>:<port> form
- for (var i = 0; i < config.hosts.length; i++) {
- var hostSpec = config.hosts[i];
- if (!hostSpec) { continue; }
- var host = hostSpec.split(':');
- if (host.length > 2) {
- log.warn('malformed host entry "' + hostSpec + '" (skipping)');
- }
- log.debug("adding " + hostSpec + " to working node list");
- this.nodes.push([host[0], (isNaN(host[1])) ? 9160 : host[1]]);
- }
-
- var self = this;
- var maxSize = isNaN(config.maxSize) ? 25 : config.maxsize;
- var idleMillis = isNaN(config.idleMillis) ? 30000 : config.idleMillis;
-
- this.pool = genericPool.Pool({
- name : 'Connection',
- create : function(callback) {
- // Advance through the set of configured nodes
- if ((self.current_node + 1) >= self.nodes.length) {
- self.current_node = 0;
- } else {
- self.current_node++;
- }
-
- var tries = self.nodes.length;
-
- function retry(curNode) {
- tries--;
-
- if ((curNode + 1) >= self.nodes.length) {
- curNode = 0;
- } else {
- curNode++;
- }
-
- var node = self.nodes[curNode];
- // Skip over any nodes known to be bad
- if (node.holdUntil > (new Date().getTime())) {
- return retry(curNode);
- }
-
- var conn = new Connection({host: node[0],
- port: node[1],
- keyspace: config.keyspace,
- user: config.user,
- pass: config.pass,
- use_bigints: self.use_bigints,
- timeout: self.timeout,
- log_time: self.log_time});
-
- conn.connect(function(err) {
- if (!err) { // Success, we're connected
- callback(conn);
- } else if (tries > 0) { // Fail, mark node inactive and
retry
- log.err("Unabled to connect to " + node[0] + ":" + node[1] + "
(skipping)");
- node.holdUntil = new Date().getTime() + self.holdFor;
- retry(curNode);
- } else { // Exhausted all options
- callback(err);
- }
- });
- }
- retry(self.current_node);
- },
- destroy : function(conn) { conn.close(); },
- max : maxSize,
- idleTimeoutMillis : idleMillis,
- log : false
- });
-};
-
-/**
- * executes any query
- * @param query any CQL statement with '?' placeholders.
- * @param args array of arguments that will be bound to the query.
- * @param callback executed when the query returns. the callback takes a
different number of arguments depending on the
- * type of query:
- * SELECT (single row): callback(err, row)
- * SELECT (mult rows) : callback(err, rows)
- * SELECT (count) : callback(err, count)
- * UPDATE : callback(err)
- * DELETE : callback(err)
- */
-PooledConnection.prototype.execute = function(query, args, callback) {
- var self = this;
- var seen = false;
-
- var exe = function(errback) {
- async.waterfall([
- function acquireConnFromPool(callback) {
- self.pool.acquire(function(err, conn) {
- callback(err, conn);
- });
- },
-
- function executeQuery(conn, callback) {
- conn.execute(query, args, function(err, res) {
- callback(err, res, conn);
- });
- }
- ],
-
- function(err, res, conn) {
- var connectionInfo;
-
- if (conn) {
- self.pool.release(conn);
- }
-
- if (err) {
- if (err.hasOwnProperty('name') && contains(appExceptions,
err.name)) {
- callback(err, null);
- }
- else {
- if (!seen) {
- errback();
- }
- else {
- connectionInfo = (conn) ? conn.connectionInfo : null;
- err = amendError(err, connectionInfo);
- callback(err, res);
- }
- }
- }
- else {
- callback(err, res);
- }
- });
- };
-
- var retry = function() {
- seen = true;
- exe(retry);
- };
-
- exe(retry);
-};
-
-/**
- * Signal the pool to shutdown. Once called, no new requests (read:
execute())
- * can be made. When all pending requests have terminated, the callback is
run.
- *
- * @param callback called when the pool is fully shutdown
- */
-PooledConnection.prototype.shutdown = function(callback) {
- var self = this;
- this.pool.drain(function() {
- self.pool.destroyAllNow(callback);
- });
-};
/**
* @param options: valid parts are:
@@ -377,7 +191,6 @@
options = options || {};
log.info('connecting ' + options.host + ':' + options.port);
this.validators = {};
- this.con = thrift.createConnection(options.host, options.port);
this.client = null;
this.connectionInfo = options;
this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
@@ -392,6 +205,8 @@
var self = this,
timeoutId;
+ // build connection here, so that timeouts on bad hosts happen now and
not in the constructor.
+ this.con = thrift.createConnection(self.connectionInfo.host,
self.connectionInfo.port);
this.con.on('error', function(err) {
clearTimeout(timeoutId);
amendError(err, self.connectionInfo);
@@ -546,7 +361,19 @@
start = new Date().getTime();
logCql.trace('CQL QUERY', {'query': query, 'parameterized_query':
cqlString, 'args': args});
- this.client.execute_cql_query(cql, ttypes.Compression.NONE,
function(err, res) {
+ // if a connection dies at the right place, execute_cql_query never
returns. make sure the callback gets called.
+ var timeoutId = setTimeout(function() {
+ callback(new Error('Connection timed out'));
+ timeoutId = null;
+ }, this.timeout); // todo: should we disambiguate connection timeout
vs query timeout?
+ self.client.execute_cql_query(cql, ttypes.Compression.NONE,
function(err, res) {
+ if (!timeoutId) {
+ log.warn('query returned after timeout: ' + cql);
+ return;
+ } else {
+ clearTimeout(timeoutId);
+ }
+
end = new Date().getTime();
diff = (end - start);
if (self.connectionInfo.log_time) {
@@ -578,8 +405,265 @@
callback(null, res.num);
} else if (res.type === ttypes.CqlResultType.VOID) {
callback(null);
+ } else {
+ callback(new Error('Execution unexpectedly got here. Result type
is ' + res.type));
}
}
});
}
};
+
+
+/**
+ * pooled connection behave a bit different but offer the same service
interface as regular connections.
+ * This constructor behaves differently from the normal Connection since
Connection() does some socket work.
+ * that work is delayed to connect() here.
+ */
+var ConnectionInPool = module.exports.ConnectionInPool = function(options)
{
+ options.staleThreshold = options.staleThreshold || 10000;
+ // cache options so that thrift setup can happen later.
+ this._options = options;
+ this.taken = false; // true when being used in a query.
+ this.connected = false; // true when connected.
+ this.unhealthyAt = 0; // timestamp this connection went bad.
+}
+util.inherits(ConnectionInPool, Connection);
+
+/**
+ * connects to the remote endpoint.
+ * @param callback
+ */
+ConnectionInPool.prototype.connect = function(callback) {
+ var self = this;
+ Connection.call(this, this._options);
+ Connection.prototype.connect.call(this, function(err) {
+ self.connected = !err;
+ self.unhealthyAt = err ? new Date().getTime() : 0;
+ callback(err);
+ });
+};
+
+ConnectionInPool.prototype.isHealthy = function() {
+ return this.unhealthyAt === 0;
+}
+
+/**
+ * a 'stale unhealthy' node is a node that has been bad for some period of
time. After that
+ * period, it is safe to retry the connection.
+ */
+ConnectionInPool.prototype.isStaleUnhealthy = function() {
+ return !this.isHealthy() && new Date().getTime() - this.unhealthyAt >
this._options.staleThreshold;
+}
+
+/**
+ * Perform queries against a pool of open connections.
+ *
+ * Accepts a single argument of an object used to configure the new
PooledConnection
+ * instance. The config object supports the following attributes:
+ *
+ * hosts : List of strings in host:port format.
+ * keyspace : Keyspace name.
+ * user : User for authentication (optional).
+ * pass : Password for authentication (optional).
+ * maxSize : Maximum number of connection to pool (optional).
+ * idleMillis : Idle connection timeout in milliseconds (optional).
+ *
+ * Example:
+ *
+ * var pool = new PooledConnection({
+ * hosts : ['host1:9160', 'host2:9170', 'host3', 'host4'],
+ * keyspace : 'database',
+ * user : 'mary',
+ * pass : 'qwerty',
+ * maxSize : 25,
+ * idleMillis : 30000
+ * });
+ *
+ * @param config an object used to control the creation of new instances.
+ */
+var PooledConnection = module.exports.PooledConnection = function(config) {
+ var self = this;
+ config = config || {};
+ this.connections = [];
+ this.current_node = 0;
+ this.use_bigints = config.use_bigints ? true : false;
+ this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
+ this.log_time = config.log_time || false;
+
+ // Construct a list of nodes from hosts in <host>:<port> form
+ for (var i = 0; i < config.hosts.length; i++) {
+ var hostSpec = config.hosts[i];
+ if (!hostSpec) { continue; }
+ var host = hostSpec.split(':');
+ if (host.length > 2) {
+ log.warn('malformed host entry "' + hostSpec + '" (skipping)');
+ continue;
+ }
+ log.debug("adding " + hostSpec + " to working node list");
+ this.connections.push(new ConnectionInPool({
+ host: host[0],
+ port: (isNaN(host[1])) ? 9160 : host[1],
+ keyspace: config.keyspace,
+ user: config.user,
+ pass: config.pass,
+ use_bigints: self.use_bigints,
+ timeout: self.timeout,
+ log_time: self.log_time
+ }));
+ }
+};
+
+/**
+ * increment the current node pointer, skipping over any bad nodes. has a
side-effect of resetting
+ * unhealthy nodes that are stale (but not reconnecting them).
+ * @return boolean indicating if all nodes are unhealthy.
+ */
+PooledConnection.prototype._incr = function() {
+ var incrCount = 0;
+ while (incrCount < this.connections.length) {
+ incrCount += 1;
+ this.current_node = (this.current_node + 1) % this.connections.length;
+ if (this.connections[this.current_node]) {
+ if (this.connections[this.current_node].isHealthy()) {
+ break;
+ } else if (this.connections[this.current_node].isStaleUnhealthy()) {
+ // unhealthy and stale, so let reset the node (appears as if
unconnected).
+ this.connections[this.current_node].taken = false;
+ this.connections[this.current_node].connected = false;
+ this.connections[this.current_node].unhealthyAt = 0;
+ break;
+ } else {
+ //`console.log('not healthy ' + this.current_node + ',' +
incrCount);
+ }
+ }
+ }
+ // all nodes are unhealthy if we looped around and no healthy nodes were
found.
+ return incrCount >= this.connections.length
&& !this.connections[this.current_node].isHealthy();
+};
+
+/**
+ * executes any query
+ * @param query any CQL statement with '?' placeholders.
+ * @param args array of arguments that will be bound to the query.
+ * @param callback executed when the query returns. the callback takes a
different number of arguments depending on the
+ * type of query:
+ * SELECT (single row): callback(err, row)
+ * SELECT (mult rows) : callback(err, rows)
+ * SELECT (count) : callback(err, count)
+ * UPDATE : callback(err)
+ * DELETE : callback(err)
+ */
+PooledConnection.prototype.execute = function(query, args, callback) {
+ var self = this;
+ self._getNextCon(function(err, con) {
+ if (err) {
+ callback(err, null);
+ } else {
+ try {
+ con.taken = true;
+ con.execute(query, args, function(err, result) {
+ con.taken = false;
+ var recoverableError = null;
+ if (err) {
+ if (err.hasOwnProperty('name') && contains(appExceptions,
err.name)) {
+ callback(err, null);
+ return;
+ } else {
+ recoverableError = err;
+ }
+ if (recoverableError) {
+ con.unhealthyAt = new Date().getTime();
+ con.taken = false;
+ log.warn('setting unhealthy from execute ' +
con.connectionInfo.host + ':' + con.connectionInfo.port);
+ // try again.
+ self.execute(query, args, callback);
+ }
+ } else {
+ callback(null, result);
+ }
+ });
+ } catch (err) {
+ // individual connection has failed.
+ con.unhealthyAt = new Date().getTime();
+ con.taken = false;
+ log.warn('setting unhealthy from catch outside execute ' +
con.connectionInfo.host + ':' + con.connectionInfo.port);
+ // try again.
+ self.execute(query, args, callback);
+ }
+ }
+ });
+};
+
+/** gets the next untaken connection. errors when all connections are bad,
or loop times out. */
+PooledConnection.prototype._getNextCon = function(callback) {
+ var self = this;
+ var tryStart = new Date().getTime();
+ var con = null;
+ var allBad = false;
+ var takens = [];
+ async.whilst(function truthTest() {
+ // should the timeout of getting a single connection be the sum of all
connections? Think of a scenario where the
+ // timeout is N, but the first X nodes are unresponsive. You still
want to allow access to the subsequent good
+ // nodes.
+ return !allBad && con === null && (new Date().getTime() - tryStart) <
(self.timeout * self.connections.length);
+ }, function tryConnect(callback) {
+ var c = self.connections[self.current_node];
+ allBad = self._incr();
+ if (c.taken) {
+ takens[self.current_node] = takens[self.current_node] ===
undefined ? 1 : takens[self.current_node] + 1;
+ if (takens[self.current_node] > 0) {
+ // we've tried this node > 1 times and it still isn't available,
this means that all other nodes are occupied
+ // or down (we've looped around all nodes). Continually checking
will blow the stack, so lets wait
+ // 10 ms. before checking again.
+ setTimeout(callback, 10);
+ } else {
+ callback();
+ }
+ } else if (c.unhealthyAt > 0) {
+ callback();
+ } else if (!c.connected) {
+ c.connect(function(err) {
+ if (c.connected) {
+ con = c;
+ }
+ // some errors we pass back. some we swallow and iterate over.
+ if (err instanceof ttypes.NotFoundException) {
+ callback(err, null);
+ } else if (err && err.errno && err.errno === constants.ETIMEDOUT) {
+ callback();
+ } else {
+ callback();
+ }
+ });
+ } else {
+ con = c;
+ callback();
+ }
+ }, function whenDone(err) {
+ if (allBad && !err) {
+ err = new Error('All connections are unhealthy.');
+ } else if (!con && !err) {
+ err = new Error('connection was not set');
+ }
+ callback(err, con);
+ });
+};
+
+/**
+ * Signal the pool to shutdown. Once called, no new requests (read:
execute())
+ * can be made. When all pending requests have terminated, the callback is
run.
+ *
+ * @param callback called when the pool is fully shutdown
+ */
+PooledConnection.prototype.shutdown = function(callback) {
+ // todo: we need to be able to let pending execute()s finish and block
executes from happening while shutting down.
+ this.connections.forEach(function(con) {
+ if (con.connected) {
+ con.close();
+ }
+ });
+ if (callback) {
+ callback();
+ }
+};
+