You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2012/01/30 20:07:22 UTC

[cassandra-node] 2 new revisions pushed by gdusbabek@gmail.com on 2012-01-30 19:06 GMT

2 new revisions:

Revision: 9888be187a23
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:40:08 2012
Log:      Emit log events and remove logmagic dependency
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9888be187a23

Revision: 764dd54a7bc3
Author:   Gary Dusbabek <gd...@gmail.com>
Date:     Mon Jan 30 10:26:55 2012
Log:      Merge commit '9888be187a23a2a2b5ca8a01e7710b7e48f9f00c' into  
16/remove...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=764dd54a7bc3

==============================================================================
Revision: 9888be187a23
Author:   Christoph Tavan <de...@tavan.de>
Date:     Thu Jan 26 14:40:08 2012
Log:      Emit log events and remove logmagic dependency

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9888be187a23

Modified:
  /README.md
  /lib/driver.js
  /lib/system.js
  /package.json
  /test/test_driver.js

=======================================
--- /README.md	Fri Dec  2 13:23:47 2011
+++ /README.md	Thu Jan 26 14:40:08 2012
@@ -130,6 +130,17 @@
      // Shutting down a pool
      connection_pool.shutdown(function() { console.log("connection pool  
shutdown"); });

+### Logging
+Instances of `Connection()` and `PooledConnection()` are `EventEmitter`'s  
and emit `log` events:
+
+    var Connection = require('node-cassandra-client').Connection;
+    var con = new Connection({host:'cassandra-host', port:9160,  
keyspace:'Keyspace1', user:'user', pass:'password'});
+    con.on('log', function(level, message) {
+      console.log('log event: %s -- %j', level, message);
+    });
+
+The `level` being passed to the listener can be one of `debug`, `info`,  
`warn`, `error`, `timing` and `cql`. The `message` is usually a string, in  
the case of `timing` and `cql` it is an object that provides more detailed  
information.
+

  Things you should know about
  ============================
=======================================
--- /lib/driver.js	Mon Jan 23 11:22:55 2012
+++ /lib/driver.js	Thu Jan 26 14:40:08 2012
@@ -17,10 +17,6 @@

  /** node.js driver for Cassandra-CQL. */

-var log = require('logmagic').local('node-cassandra-client.driver');
-var logCql = require('logmagic').local('node-cassandra-client.driver.cql');
-var logTiming =  
require('logmagic').local('node-cassandra-client.driver.timing');
-
  var util = require('util');
  var constants = require('constants');
  var Buffer = require('buffer').Buffer;
@@ -187,12 +183,14 @@
   */
  var Connection = module.exports.Connection = function(options) {
    options = options || {};
-  log.info('connecting ' + options.host + ':' + options.port);
    this.validators = {};
    this.client = null;
    this.connectionInfo = options;
    this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
+
+  EventEmitter.call(this);
  };
+util.inherits(Connection, EventEmitter);


  /**
@@ -203,6 +201,8 @@
    var self = this,
        timeoutId;

+  self.emit('log', 'info', 'connecting ' + self.connectionInfo.host + ':'  
+ self.connectionInfo.port);
+
    // build connection here, so that timeouts on bad hosts happen now and  
not in the constructor.
    this.con = thrift.createConnection(self.connectionInfo.host,  
self.connectionInfo.port);
    this.con.on('error', function(err) {
@@ -213,7 +213,7 @@

    this.con.on('close', function() {
      clearTimeout(timeoutId);
-    log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + '  
is closed');
+    self.emit('log', 'info', self.connectionInfo.host + ':' +  
self.connectionInfo.port + ' is closed');
    });

    this.con.on('connect', function() {
@@ -357,7 +357,7 @@
          start, end, diff;

      start = new Date().getTime();
-    logCql.trace('CQL QUERY', {'query': query, 'parameterized_query':  
cqlString, 'args': args});
+    self.emit('log', 'cql', {'query': query, 'parameterized_query':  
cqlString, 'args': args});

      // if a connection dies at the right place, execute_cql_query never  
returns. make sure the callback gets called.
      var timeoutId = setTimeout(function() {
@@ -366,7 +366,7 @@
      }, this.timeout); // todo: should we disambiguate connection timeout  
vs query timeout?
      self.client.execute_cql_query(cql, ttypes.Compression.NONE,  
function(err, res) {
        if (!timeoutId) {
-        log.warn('query returned after timeout: ' + cql);
+        self.emit('log', 'warn', 'query returned after timeout: ' + cql);
          return;
        } else {
          clearTimeout(timeoutId);
@@ -375,8 +375,12 @@
        end = new Date().getTime();
        diff = (end - start);
        if (self.connectionInfo.log_time) {
-        logTiming.trace('CQL QUERY TIMING', {'query':  
query, 'parameterized_query': cqlString, 'args': args,
-                                             'time': diff});
+        self.emit('log', 'timing', {
+          'query': query,
+          'parameterized_query': cqlString,
+          'args': args,
+          'time': diff
+        });
        }

        if (err) {
@@ -494,11 +498,9 @@
      if (!hostSpec) { continue; }
      var host = hostSpec.split(':');
      if (host.length > 2) {
-      log.warn('malformed host entry "' + hostSpec + '" (skipping)');
-      continue;
-    }
-    log.debug("adding " + hostSpec + " to working node list");
-    this.connections.push(new ConnectionInPool({
+      throw new Error('malformed host entry "' + hostSpec + '"  
(skipping)');
+    }
+    var connection = new ConnectionInPool({
        host: host[0],
        port: (isNaN(host[1])) ? 9160 : host[1],
        keyspace: config.keyspace,
@@ -507,9 +509,16 @@
        use_bigints: self.use_bigints,
        timeout: self.timeout,
        log_time: self.log_time
-    }));
-  }
+    });
+    connection.on('log', function(level, message) {
+      self.emit('log', level, message);
+    });
+    this.connections.push(connection);
+  }
+
+  EventEmitter.call(this);
  };
+util.inherits(PooledConnection, EventEmitter);

  /**
   * increment the current node pointer, skipping over any bad nodes.  has a  
side-effect of resetting
@@ -572,7 +581,7 @@
              if (recoverableError) {
                con.unhealthyAt = new Date().getTime();
                con.taken = false;
-              log.warn('setting unhealthy from execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
+              self.emit('log', 'warn', 'setting unhealthy from execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
                // try again.
                self.execute(query, args, callback);
              }
@@ -584,7 +593,7 @@
          // individual connection has failed.
          con.unhealthyAt = new Date().getTime();
          con.taken = false;
-        log.warn('setting unhealthy from catch outside execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
+        self.emit('log', 'warn', 'setting unhealthy from catch outside  
execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port);
          // try again.
          self.execute(query, args, callback);
        }
=======================================
--- /lib/system.js	Sat Jan 21 15:42:46 2012
+++ /lib/system.js	Thu Jan 26 14:40:08 2012
@@ -15,7 +15,6 @@
   *
   */

-var log = require('logmagic').local('node-cassandra-client.system');
  var util = require('util');
  var EventEmitter = require('events').EventEmitter;

@@ -58,7 +57,10 @@
      })(parts[0], parts[1], i);
    }
    this.length = i;
+
+  EventEmitter.call(this);
  };
+util.inherits(Pool, EventEmitter);

  // makes the thrift connection+client.
  Pool.prototype._make_client = function(addr, port, i, callback) {
@@ -72,12 +74,12 @@
    var con = thrift.createConnection(addr, port);
    var self = this;
    con.on('error', function(err) {
-    log.error(err);
+    self.emit('log', 'error', err);
      self.connections[i] = null;
      self.connecting[i] = false;
    });
    var client = thrift.createClient(Cassandra, con);
-  log.info('connected to ' + addr + ':' + port + '@' + i);
+  self.emit('log', 'info', 'connected to ' + addr + ':' + port + '@' + i);
    self.connections[i] = new ThriftConnection(con, client);
    if (callback) {
      callback(this.connections[i]);
=======================================
--- /package.json	Mon Jan 23 11:23:18 2012
+++ /package.json	Thu Jan 26 14:40:08 2012
@@ -28,7 +28,6 @@
    "dependencies": {
      "async": ">= 0.1.12",
      "thrift": ">= 0.6.0-1",
-    "logmagic": ">= 0.1.1",
      "whiskey": ">= 0.6.1",
      "node-uuid": ">= 1.3.3"
    },
=======================================
--- /test/test_driver.js	Mon Jan 16 08:31:48 2012
+++ /test/test_driver.js	Thu Jan 26 14:40:08 2012
@@ -21,7 +21,6 @@
  var EventEmitter = require('events').EventEmitter;
  var http = require('http');

-var logmagic = require('logmagic');
  var async = require('async');

  var BigInteger = require('../lib/bigint').BigInteger;
@@ -86,6 +85,12 @@
      callback(null, con);
    });
    var con = new Connection(connOptions);
+  con.on('log', function(level, message) {
+    if (['cql'].indexOf(level) !== -1) {
+      return;
+    }
+    console.log('log event: %s -- %j', level, message);
+  });
    con.connect(function(err) {
      if (err) {
        callback(err, null);
@@ -942,16 +947,16 @@
    var logObjsCql = [];
    var logObjsTime = [];

-  logmagic.registerSink('cql_sink', function(module, level, message, obj) {
-    logObjsCql.push(arguments);
-  });
-
-  logmagic.registerSink('timing_sink', function(module, level, message,  
obj) {
-    logObjsTime.push(arguments);
-  });
-
-  logmagic.route('node-cassandra-client.driver.cql',  
logmagic.TRACE1, 'cql_sink');
-  logmagic.route('node-cassandra-client.driver.timing',  
logmagic.TRACE1, 'timing_sink');
+  var appendLog = function(level, message) {
+    if (level === 'cql') {
+      logObjsCql.push(message);
+    }
+    if (level === 'timing') {
+      logObjsTime.push(message);
+    }
+  };
+  conn1.on('log', appendLog);
+  conn2.on('log', appendLog);

    conn1.execute('UPDATE CfUgly SET A=1 WHERE KEY=1', [], function(err) {
      var logObj;
@@ -962,10 +967,10 @@
      assert.equal(logObjsTime.length, 0);

      logObj = logObjsCql[0];
-    assert.ok(logObj[3].hasOwnProperty('query'));
-    assert.ok(logObj[3].hasOwnProperty('parameterized_query'));
-    assert.ok(logObj[3].hasOwnProperty('args'));
-    assert.ok(!logObj[3].hasOwnProperty('time'));
+    assert.ok(logObj.hasOwnProperty('query'));
+    assert.ok(logObj.hasOwnProperty('parameterized_query'));
+    assert.ok(logObj.hasOwnProperty('args'));
+    assert.ok(!logObj.hasOwnProperty('time'));

      conn2.execute('SELECT A FROM CfUgly WHERE KEY=1', [], function(err,  
rows) {
        var logObj;
@@ -977,14 +982,12 @@
        assert.equal(logObjsTime.length, 1);
        logObj = logObjsTime[0];

-      assert.ok(logObj[3].hasOwnProperty('query'));
-      assert.ok(logObj[3].hasOwnProperty('parameterized_query'));
-      assert.ok(logObj[3].hasOwnProperty('args'));
-      assert.ok(logObj[3].hasOwnProperty('time'));
-
-      conn1.shutdown();
-      conn2.shutdown();
-      test.finish();
+      assert.ok(logObj.hasOwnProperty('query'));
+      assert.ok(logObj.hasOwnProperty('parameterized_query'));
+      assert.ok(logObj.hasOwnProperty('args'));
+      assert.ok(logObj.hasOwnProperty('time'));
+
+      conn1.shutdown(conn2.shutdown(test.finish));
      });
    });
  };

==============================================================================
Revision: 764dd54a7bc3
Author:   Gary Dusbabek <gd...@gmail.com>
Date:     Mon Jan 30 10:26:55 2012
Log:      Merge commit '9888be187a23a2a2b5ca8a01e7710b7e48f9f00c' into  
16/remove_logmagic

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=764dd54a7bc3

Modified:
  /lib/driver.js
  /test/test_driver.js

=======================================
--- /lib/driver.js	Mon Jan 30 10:16:45 2012
+++ /lib/driver.js	Mon Jan 30 10:26:55 2012
@@ -17,10 +17,6 @@

  /** node.js driver for Cassandra-CQL. */

-var log = require('logmagic').local('node-cassandra-client.driver');
-var logCql = require('logmagic').local('node-cassandra-client.driver.cql');
-var logTiming =  
require('logmagic').local('node-cassandra-client.driver.timing');
-
  var util = require('util');
  var constants = require('constants');
  var EventEmitter = require('events').EventEmitter;
@@ -189,12 +185,14 @@
   */
  var Connection = module.exports.Connection = function(options) {
    options = options || {};
-  log.info('connecting ' + options.host + ':' + options.port);
    this.validators = {};
    this.client = null;
    this.connectionInfo = options;
    this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
+
+  EventEmitter.call(this);
  };
+util.inherits(Connection, EventEmitter);


  /**
@@ -205,6 +203,8 @@
    var self = this,
        timeoutId;

+  self.emit('log', 'info', 'connecting ' + self.connectionInfo.host + ':'  
+ self.connectionInfo.port);
+
    // build connection here, so that timeouts on bad hosts happen now and  
not in the constructor.
    this.con = thrift.createConnection(self.connectionInfo.host,  
self.connectionInfo.port);
    this.con.on('error', function(err) {
@@ -215,7 +215,7 @@

    this.con.on('close', function() {
      clearTimeout(timeoutId);
-    log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + '  
is closed');
+    self.emit('log', 'info', self.connectionInfo.host + ':' +  
self.connectionInfo.port + ' is closed');
    });

    this.con.on('connect', function() {
@@ -359,7 +359,7 @@
          start, end, diff;

      start = new Date().getTime();
-    logCql.trace('CQL QUERY', {'query': query, 'parameterized_query':  
cqlString, 'args': args});
+    self.emit('log', 'cql', {'query': query, 'parameterized_query':  
cqlString, 'args': args});

      // if a connection dies at the right place, execute_cql_query never  
returns. make sure the callback gets called.
      var timeoutId = setTimeout(function() {
@@ -368,7 +368,7 @@
      }, this.timeout); // todo: should we disambiguate connection timeout  
vs query timeout?
      self.client.execute_cql_query(cql, ttypes.Compression.NONE,  
function(err, res) {
        if (!timeoutId) {
-        log.warn('query returned after timeout: ' + cql);
+        self.emit('log', 'warn', 'query returned after timeout: ' + cql);
          return;
        } else {
          clearTimeout(timeoutId);
@@ -377,8 +377,12 @@
        end = new Date().getTime();
        diff = (end - start);
        if (self.connectionInfo.log_time) {
-        logTiming.trace('CQL QUERY TIMING', {'query':  
query, 'parameterized_query': cqlString, 'args': args,
-                                             'time': diff});
+        self.emit('log', 'timing', {
+          'query': query,
+          'parameterized_query': cqlString,
+          'args': args,
+          'time': diff
+        });
        }

        if (err) {
@@ -496,11 +500,9 @@
      if (!hostSpec) { continue; }
      var host = hostSpec.split(':');
      if (host.length > 2) {
-      log.warn('malformed host entry "' + hostSpec + '" (skipping)');
-      continue;
-    }
-    log.debug("adding " + hostSpec + " to working node list");
-    this.connections.push(new ConnectionInPool({
+      throw new Error('malformed host entry "' + hostSpec + '"  
(skipping)');
+    }
+    var connection = new ConnectionInPool({
        host: host[0],
        port: (isNaN(host[1])) ? 9160 : host[1],
        keyspace: config.keyspace,
@@ -509,9 +511,16 @@
        use_bigints: self.use_bigints,
        timeout: self.timeout,
        log_time: self.log_time
-    }));
-  }
+    });
+    connection.on('log', function(level, message) {
+      self.emit('log', level, message);
+    });
+    this.connections.push(connection);
+  }
+
+  EventEmitter.call(this);
  };
+util.inherits(PooledConnection, EventEmitter);

  /**
   * increment the current node pointer, skipping over any bad nodes.  has a  
side-effect of resetting
@@ -574,7 +583,7 @@
              if (recoverableError) {
                con.unhealthyAt = new Date().getTime();
                con.taken = false;
-              log.warn('setting unhealthy from execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
+              self.emit('log', 'warn', 'setting unhealthy from execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
                // try again.
                self.execute(query, args, callback);
              }
@@ -586,7 +595,7 @@
          // individual connection has failed.
          con.unhealthyAt = new Date().getTime();
          con.taken = false;
-        log.warn('setting unhealthy from catch outside execute ' +  
con.connectionInfo.host + ':' + con.connectionInfo.port);
+        self.emit('log', 'warn', 'setting unhealthy from catch outside  
execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port);
          // try again.
          self.execute(query, args, callback);
        }
=======================================
--- /test/test_driver.js	Mon Jan 30 10:16:45 2012
+++ /test/test_driver.js	Mon Jan 30 10:26:55 2012
@@ -21,7 +21,6 @@
  var EventEmitter = require('events').EventEmitter;
  var http = require('http');

-var logmagic = require('logmagic');
  var async = require('async');

  var BigInteger = require('../lib/bigint').BigInteger;
@@ -86,6 +85,12 @@
      callback(null, con);
    });
    var con = new Connection(connOptions);
+  con.on('log', function(level, message) {
+    if (['cql'].indexOf(level) !== -1) {
+      return;
+    }
+    console.log('log event: %s -- %j', level, message);
+  });
    con.connect(function(err) {
      if (err) {
        callback(err, null);
@@ -941,16 +946,16 @@
    var logObjsCql = [];
    var logObjsTime = [];

-  logmagic.registerSink('cql_sink', function(module, level, message, obj) {
-    logObjsCql.push(arguments);
-  });
-
-  logmagic.registerSink('timing_sink', function(module, level, message,  
obj) {
-    logObjsTime.push(arguments);
-  });
-
-  logmagic.route('node-cassandra-client.driver.cql',  
logmagic.TRACE1, 'cql_sink');
-  logmagic.route('node-cassandra-client.driver.timing',  
logmagic.TRACE1, 'timing_sink');
+  var appendLog = function(level, message) {
+    if (level === 'cql') {
+      logObjsCql.push(message);
+    }
+    if (level === 'timing') {
+      logObjsTime.push(message);
+    }
+  };
+  conn1.on('log', appendLog);
+  conn2.on('log', appendLog);

    conn1.execute('UPDATE CfUgly SET A=1 WHERE KEY=1', [], function(err) {
      var logObj;
@@ -961,10 +966,10 @@
      assert.equal(logObjsTime.length, 0);

      logObj = logObjsCql[0];
-    assert.ok(logObj[3].hasOwnProperty('query'));
-    assert.ok(logObj[3].hasOwnProperty('parameterized_query'));
-    assert.ok(logObj[3].hasOwnProperty('args'));
-    assert.ok(!logObj[3].hasOwnProperty('time'));
+    assert.ok(logObj.hasOwnProperty('query'));
+    assert.ok(logObj.hasOwnProperty('parameterized_query'));
+    assert.ok(logObj.hasOwnProperty('args'));
+    assert.ok(!logObj.hasOwnProperty('time'));

      conn2.execute('SELECT A FROM CfUgly WHERE KEY=1', [], function(err,  
rows) {
        var logObj;
@@ -976,14 +981,12 @@
        assert.equal(logObjsTime.length, 1);
        logObj = logObjsTime[0];

-      assert.ok(logObj[3].hasOwnProperty('query'));
-      assert.ok(logObj[3].hasOwnProperty('parameterized_query'));
-      assert.ok(logObj[3].hasOwnProperty('args'));
-      assert.ok(logObj[3].hasOwnProperty('time'));
-
-      conn1.shutdown();
-      conn2.shutdown();
-      test.finish();
+      assert.ok(logObj.hasOwnProperty('query'));
+      assert.ok(logObj.hasOwnProperty('parameterized_query'));
+      assert.ok(logObj.hasOwnProperty('args'));
+      assert.ok(logObj.hasOwnProperty('time'));
+
+      conn1.shutdown(conn2.shutdown(test.finish));
      });
    });
  };