You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2012/01/23 18:30:20 UTC

[cassandra-node] push by gdusbabek@gmail.com - fix merge conflict on 2012-01-23 17:29 GMT

Revision: b94f6bdb675c
Author:   Gary Dusbabek <gd...@gmail.com>
Date:     Mon Jan 23 09:29:06 2012
Log:      fix merge conflict

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=b94f6bdb675c

Modified:
  /lib/driver.js

=======================================
--- /lib/driver.js	Sat Jan 21 15:42:46 2012
+++ /lib/driver.js	Mon Jan 23 09:29:06 2012
@@ -21,6 +21,7 @@
  var logCql = require('logmagic').local('node-cassandra-client.driver.cql');
  var logTiming =  
require('logmagic').local('node-cassandra-client.driver.timing');

+var util = require('util');
  var constants = require('constants');
  var Buffer = require('buffer').Buffer;
  var EventEmitter = require('events').EventEmitter;
@@ -181,193 +182,6 @@
    return this._colCount;
  };

-/**
- * Perform queries against a pool of open connections.
- *
- * Accepts a single argument of an object used to configure the new  
PooledConnection
- * instance.  The config object supports the following attributes:
- *
- *         hosts : List of strings in host:port format.
- *      keyspace : Keyspace name.
- *          user : User for authentication (optional).
- *          pass : Password for authentication (optional).
- *       maxSize : Maximum number of connection to pool (optional).
- *    idleMillis : Idle connection timeout in milliseconds (optional).
- *
- * Example:
- *
- *   var pool = new PooledConnection({
- *     hosts      : ['host1:9160', 'host2:9170', 'host3', 'host4'],
- *     keyspace   : 'database',
- *     user       : 'mary',
- *     pass       : 'qwerty',
- *     maxSize    : 25,
- *     idleMillis : 30000
- *   });
- *
- * @param config an object used to control the creation of new instances.
- */
-var PooledConnection = module.exports.PooledConnection = function(config) {
-  config = config || {};
-  this.nodes = [];
-  this.holdFor = 10000;
-  this.current_node = 0;
-  this.use_bigints = config.use_bigints ? true : false;
-  this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
-  this.log_time = config.log_time || false;
-
-  // Construct a list of nodes from hosts in <host>:<port> form
-  for (var i = 0; i < config.hosts.length; i++) {
-    var hostSpec = config.hosts[i];
-    if (!hostSpec) { continue; }
-    var host = hostSpec.split(':');
-    if (host.length > 2) {
-      log.warn('malformed host entry "' + hostSpec + '" (skipping)');
-    }
-    log.debug("adding " + hostSpec + " to working node list");
-    this.nodes.push([host[0], (isNaN(host[1])) ? 9160 : host[1]]);
-  }
-
-  var self = this;
-  var maxSize = isNaN(config.maxSize) ? 25 : config.maxsize;
-  var idleMillis = isNaN(config.idleMillis) ? 30000 : config.idleMillis;
-
-  this.pool = genericPool.Pool({
-    name    : 'Connection',
-    create  : function(callback) {
-      // Advance through the set of configured nodes
-      if ((self.current_node + 1) >= self.nodes.length) {
-        self.current_node = 0;
-      } else {
-        self.current_node++;
-      }
-
-      var tries = self.nodes.length;
-
-	    function retry(curNode) {
-	      tries--;
-
-	      if ((curNode + 1) >= self.nodes.length) {
-          curNode = 0;
-        } else {
-          curNode++;
-        }
-
-	      var node = self.nodes[curNode];
-	      // Skip over any nodes known to be bad
-	      if (node.holdUntil > (new Date().getTime())) {
-	        return retry(curNode);
-	      }
-
-	      var conn = new Connection({host: node[0],
-                                   port: node[1],
-                                   keyspace: config.keyspace,
-                                   user: config.user,
-                                   pass: config.pass,
-                                   use_bigints: self.use_bigints,
-                                   timeout: self.timeout,
-                                   log_time: self.log_time});
-
-	      conn.connect(function(err) {
-	        if (!err) {                   // Success, we're connected
-	          callback(conn);
-	        } else if (tries > 0) {       // Fail, mark node inactive and  
retry
-	          log.err("Unabled to connect to " + node[0] + ":" + node[1] + "  
(skipping)");
-	          node.holdUntil = new Date().getTime() + self.holdFor;
-	          retry(curNode);
-	        } else {                      // Exhausted all options
-	          callback(err);
-	        }
-	      });
-	    }
-	    retry(self.current_node);
-	  },
-	  destroy : function(conn) { conn.close(); },
-	  max     : maxSize,
-	  idleTimeoutMillis : idleMillis,
-	  log : false
-  });
-};
-
-/**
- * executes any query
- * @param query any CQL statement with '?' placeholders.
- * @param args array of arguments that will be bound to the query.
- * @param callback executed when the query returns. the callback takes a  
different number of arguments depending on the
- * type of query:
- *    SELECT (single row): callback(err, row)
- *    SELECT (mult rows) : callback(err, rows)
- *    SELECT (count)     : callback(err, count)
- *    UPDATE             : callback(err)
- *    DELETE             : callback(err)
- */
-PooledConnection.prototype.execute = function(query, args, callback) {
-  var self = this;
-  var seen = false;
-
-  var exe = function(errback) {
-    async.waterfall([
-      function acquireConnFromPool(callback) {
-        self.pool.acquire(function(err, conn) {
-          callback(err, conn);
-        });
-      },
-
-      function executeQuery(conn, callback) {
-        conn.execute(query, args, function(err, res) {
-          callback(err, res, conn);
-        });
-      }
-    ],
-
-    function(err, res, conn) {
-      var connectionInfo;
-
-      if (conn) {
-        self.pool.release(conn);
-      }
-
-      if (err) {
-        if (err.hasOwnProperty('name') && contains(appExceptions,  
err.name)) {
-          callback(err, null);
-        }
-        else {
-          if (!seen) {
-            errback();
-          }
-          else {
-            connectionInfo = (conn) ? conn.connectionInfo : null;
-            err = amendError(err, connectionInfo);
-            callback(err, res);
-          }
-        }
-      }
-      else {
-        callback(err, res);
-      }
-    });
-  };
-
-  var retry = function() {
-    seen = true;
-    exe(retry);
-  };
-
-  exe(retry);
-};
-
-/**
- * Signal the pool to shutdown.  Once called, no new requests (read:  
execute())
- * can be made. When all pending requests have terminated, the callback is  
run.
- *
- * @param callback called when the pool is fully shutdown
- */
-PooledConnection.prototype.shutdown = function(callback) {
-  var self = this;
-  this.pool.drain(function() {
-    self.pool.destroyAllNow(callback);
-  });
-};

  /**
   * @param options: valid parts are:
@@ -377,7 +191,6 @@
    options = options || {};
    log.info('connecting ' + options.host + ':' + options.port);
    this.validators = {};
-  this.con = thrift.createConnection(options.host, options.port);
    this.client = null;
    this.connectionInfo = options;
    this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
@@ -392,6 +205,8 @@
    var self = this,
        timeoutId;

+  // build connection here, so that timeouts on bad hosts happen now and  
not in the constructor.
+  this.con = thrift.createConnection(self.connectionInfo.host,  
self.connectionInfo.port);
    this.con.on('error', function(err) {
      clearTimeout(timeoutId);
      amendError(err, self.connectionInfo);
@@ -546,7 +361,19 @@
      start = new Date().getTime();
      logCql.trace('CQL QUERY', {'query': query, 'parameterized_query':  
cqlString, 'args': args});

-    this.client.execute_cql_query(cql, ttypes.Compression.NONE,  
function(err, res) {
+    // if a connection dies at the right place, execute_cql_query never  
returns. make sure the callback gets called.
+    var timeoutId = setTimeout(function() {
+      callback(new Error('Connection timed out'));
+      timeoutId = null;
+    }, this.timeout); // todo: should we disambiguate connection timeout  
vs query timeout?
+    self.client.execute_cql_query(cql, ttypes.Compression.NONE,  
function(err, res) {
+      if (!timeoutId) {
+        log.warn('query returned after timeout: ' + cql);
+        return;
+      } else {
+        clearTimeout(timeoutId);
+      }
+
        end = new Date().getTime();
        diff = (end - start);
        if (self.connectionInfo.log_time) {
@@ -578,8 +405,265 @@
            callback(null, res.num);
          } else if (res.type === ttypes.CqlResultType.VOID) {
            callback(null);
+        } else {
+          callback(new Error('Execution unexpectedly got here. Result type  
is ' + res.type));
          }
        }
      });
    }
  };
+
+
+/**
+ * pooled connection behave a bit different but offer the same service  
interface as regular connections.
+ * This constructor behaves differently from the normal Connection since  
Connection() does some socket work.
+ * that work is delayed to connect() here.
+ */
+var ConnectionInPool = module.exports.ConnectionInPool = function(options)  
{
+  options.staleThreshold = options.staleThreshold || 10000;
+  // cache options so that thrift setup can happen later.
+  this._options = options;
+  this.taken = false; // true when being used in a query.
+  this.connected = false; // true when connected.
+  this.unhealthyAt = 0; // timestamp this connection went bad.
+}
+util.inherits(ConnectionInPool, Connection);
+
+/**
+ * connects to the remote endpoint.
+ * @param callback
+ */
+ConnectionInPool.prototype.connect = function(callback) {
+  var self = this;
+  Connection.call(this, this._options);
+  Connection.prototype.connect.call(this, function(err) {
+    self.connected = !err;
+    self.unhealthyAt = err ? new Date().getTime() : 0;
+    callback(err);
+  });
+};
+
+ConnectionInPool.prototype.isHealthy = function() {
+    return this.unhealthyAt === 0;
+}
+
+/**
+ * a 'stale unhealthy' node is a node that has been bad for some period of  
time. After that
+ * period, it is safe to retry the connection.
+ */
+ConnectionInPool.prototype.isStaleUnhealthy = function() {
+  return !this.isHealthy() && new Date().getTime() - this.unhealthyAt >  
this._options.staleThreshold;
+}
+
+/**
+ * Perform queries against a pool of open connections.
+ *
+ * Accepts a single argument of an object used to configure the new  
PooledConnection
+ * instance.  The config object supports the following attributes:
+ *
+ *         hosts : List of strings in host:port format.
+ *      keyspace : Keyspace name.
+ *          user : User for authentication (optional).
+ *          pass : Password for authentication (optional).
+ *       maxSize : Maximum number of connection to pool (optional).
+ *    idleMillis : Idle connection timeout in milliseconds (optional).
+ *
+ * Example:
+ *
+ *   var pool = new PooledConnection({
+ *     hosts      : ['host1:9160', 'host2:9170', 'host3', 'host4'],
+ *     keyspace   : 'database',
+ *     user       : 'mary',
+ *     pass       : 'qwerty',
+ *     maxSize    : 25,
+ *     idleMillis : 30000
+ *   });
+ *
+ * @param config an object used to control the creation of new instances.
+ */
+var PooledConnection = module.exports.PooledConnection = function(config) {
+  var self = this;
+  config = config || {};
+  this.connections = [];
+  this.current_node = 0;
+  this.use_bigints = config.use_bigints ? true : false;
+  this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
+  this.log_time = config.log_time || false;
+
+  // Construct a list of nodes from hosts in <host>:<port> form
+  for (var i = 0; i < config.hosts.length; i++) {
+    var hostSpec = config.hosts[i];
+    if (!hostSpec) { continue; }
+    var host = hostSpec.split(':');
+    if (host.length > 2) {
+      log.warn('malformed host entry "' + hostSpec + '" (skipping)');
+      continue;
+    }
+    log.debug("adding " + hostSpec + " to working node list");
+    this.connections.push(new ConnectionInPool({
+      host: host[0],
+      port: (isNaN(host[1])) ? 9160 : host[1],
+      keyspace: config.keyspace,
+      user: config.user,
+      pass: config.pass,
+      use_bigints: self.use_bigints,
+      timeout: self.timeout,
+      log_time: self.log_time
+    }));
+  }
+};
+
+/**
+ * increment the current node pointer, skipping over any bad nodes.  has a  
side-effect of resetting
+ * unhealthy nodes that are stale (but not reconnecting them).
+ * @return boolean indicating if all nodes are unhealthy.
+ */
+PooledConnection.prototype._incr = function() {
+  var incrCount = 0;
+  while (incrCount < this.connections.length) {
+    incrCount += 1;
+    this.current_node = (this.current_node + 1) % this.connections.length;
+    if (this.connections[this.current_node]) {
+      if (this.connections[this.current_node].isHealthy()) {
+        break;
+      } else if (this.connections[this.current_node].isStaleUnhealthy()) {
+        // unhealthy and stale, so let reset the node (appears as if  
unconnected).
+        this.connections[this.current_node].taken = false;
+        this.connections[this.current_node].connected = false;
+        this.connections[this.current_node].unhealthyAt = 0;
+        break;
+      } else {
+        //`console.log('not healthy ' + this.current_node + ',' +  
incrCount);
+      }
+    }
+  }
+  // all nodes are unhealthy if we looped around and no healthy nodes were  
found.
+  return incrCount >= this.connections.length  
&& !this.connections[this.current_node].isHealthy();
+};
+
+/**
+ * executes any query
+ * @param query any CQL statement with '?' placeholders.
+ * @param args array of arguments that will be bound to the query.
+ * @param callback executed when the query returns. the callback takes a  
different number of arguments depending on the
+ * type of query:
+ *    SELECT (single row): callback(err, row)
+ *    SELECT (mult rows) : callback(err, rows)
+ *    SELECT (count)     : callback(err, count)
+ *    UPDATE             : callback(err)
+ *    DELETE             : callback(err)
+ */
+PooledConnection.prototype.execute = function(query, args, callback) {
+  var self = this;
+  self._getNextCon(function(err, con) {
+    if (err) {
+      callback(err, null);
+    } else {
+      try {
+        con.taken = true;
+        con.execute(query, args, function(err, result) {
+          con.taken = false;
+          var recoverableError = null;
+          if (err) {
+            if (err.hasOwnProperty('name') && contains(appExceptions,  
err.name)) {
+              callback(err, null);
+              return;
+            } else {
+              recoverableError = err;
+            }
+            if (recoverableError) {
+              con.unhealthyAt = new Date().getTime();
+              con.taken = false;
+              log.warn('setting unhealthy from execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
+              // try again.
+              self.execute(query, args, callback);
+            }
+          } else {
+            callback(null, result);
+          }
+        });
+      } catch (err) {
+        // individual connection has failed.
+        con.unhealthyAt = new Date().getTime();
+        con.taken = false;
+        log.warn('setting unhealthy from catch outside execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
+        // try again.
+        self.execute(query, args, callback);
+      }
+    }
+  });
+};
+
+/** gets the next untaken connection. errors when all connections are bad,  
or loop times out. */
+PooledConnection.prototype._getNextCon = function(callback) {
+  var self = this;
+  var tryStart = new Date().getTime();
+  var con = null;
+  var allBad = false;
+  var takens = [];
+  async.whilst(function truthTest() {
+    // should the timeout of getting a single connection be the sum of all  
connections?  Think of a scenario where the
+    // timeout is N, but the first X nodes are unresponsive.  You still  
want to allow access to the subsequent good
+    // nodes.
+    return !allBad && con === null && (new Date().getTime() - tryStart) <  
(self.timeout * self.connections.length);
+  }, function tryConnect(callback) {
+    var c = self.connections[self.current_node];
+    allBad = self._incr();
+    if (c.taken) {
+      takens[self.current_node] = takens[self.current_node] ===  
undefined ? 1 : takens[self.current_node] + 1;
+      if (takens[self.current_node] > 0) {
+        // we've tried this node > 1 times and it still isn't available,  
this means that all other nodes are occupied
+        // or down (we've looped around all nodes).  Continually checking  
will blow the stack, so lets wait
+        // 10 ms. before checking again.
+        setTimeout(callback, 10);
+      } else {
+        callback();
+      }
+    } else if (c.unhealthyAt > 0) {
+      callback();
+    } else if (!c.connected) {
+      c.connect(function(err) {
+        if (c.connected) {
+          con = c;
+        }
+        // some errors we pass back. some we swallow and iterate over.
+        if (err instanceof ttypes.NotFoundException) {
+          callback(err, null);
+        } else if (err && err.errno && err.errno === constants.ETIMEDOUT) {
+          callback();
+        } else {
+          callback();
+        }
+      });
+    } else {
+      con = c;
+      callback();
+    }
+  }, function whenDone(err) {
+    if (allBad && !err) {
+      err = new Error('All connections are unhealthy.');
+    } else if (!con && !err) {
+      err = new Error('connection was not set');
+    }
+    callback(err, con);
+  });
+};
+
+/**
+ * Signal the pool to shutdown.  Once called, no new requests (read:  
execute())
+ * can be made. When all pending requests have terminated, the callback is  
run.
+ *
+ * @param callback called when the pool is fully shutdown
+ */
+PooledConnection.prototype.shutdown = function(callback) {
+  // todo: we need to be able to let pending execute()s finish and block  
executes from happening while shutting down.
+  this.connections.forEach(function(con) {
+    if (con.connected) {
+      con.close();
+    }
+  });
+  if (callback) {
+    callback();
+  }
+};
+