You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by ra...@apache.org on 2014/04/22 15:43:02 UTC

git commit: THRIFT-2405:Node.js Multiplexer tests fail (silently) Client: node Patch: Randy Abernethy

Repository: thrift
Updated Branches:
  refs/heads/master 9b3285311 -> 7201c0d38


THRIFT-2405:Node.js Multiplexer tests fail (silently)
Client: node
Patch: Randy Abernethy

Repairs client side multiplex protocol.


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/7201c0d3
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/7201c0d3
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/7201c0d3

Branch: refs/heads/master
Commit: 7201c0d38ffb1505fdddcc9b65b16621f7e493c3
Parents: 9b32853
Author: ra <ra...@apache.org>
Authored: Tue Apr 22 06:37:37 2014 -0700
Committer: ra <ra...@apache.org>
Committed: Tue Apr 22 06:37:37 2014 -0700

----------------------------------------------------------------------
 lib/nodejs/lib/thrift/connection.js            | 82 +++++++++++++--------
 lib/nodejs/lib/thrift/multiplexed_processor.js |  2 +-
 lib/nodejs/lib/thrift/multiplexed_protocol.js  | 38 +++++-----
 3 files changed, 75 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/7201c0d3/lib/nodejs/lib/thrift/connection.js
----------------------------------------------------------------------
diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js
index 36451d5..a3c2d79 100644
--- a/lib/nodejs/lib/thrift/connection.js
+++ b/lib/nodejs/lib/thrift/connection.js
@@ -21,14 +21,16 @@ var util = require('util'),
     net = require('net'),
     tls = require('tls'),
     ttransport = require('./transport'),
-    tprotocol = require('./protocol');
+    tprotocol = require('./protocol'),
+    thrift = require('./thrift');
 
 var binary = require('./binary');
 
 var Connection = exports.Connection = function(stream, options) {
   var self = this;
   EventEmitter.call(this);
-
+  
+  this.seqId2Service = {};
   this.connection = stream;
   this.options = options || {};
   this.transport = this.options.transport || ttransport.TBufferedTransport;
@@ -37,18 +39,21 @@ var Connection = exports.Connection = function(stream, options) {
   this.connected = false;
 
   this._debug = this.options.debug || false;
-  if (this.options.max_attempts
-     && !isNaN(this.options.max_attempts) && this.options.max_attempts > 0) {
+  if (this.options.max_attempts && 
+      !isNaN(this.options.max_attempts) && 
+      this.options.max_attempts > 0) {
      this.max_attempts = +this.options.max_attempts;
   }
   this.retry_max_delay = null;
-  if (this.options.retry_max_delay !== undefined
-     && !isNaN(this.options.retry_max_delay) && this.options.retry_max_delay > 0) {
+  if (this.options.retry_max_delay !== undefined && 
+      !isNaN(this.options.retry_max_delay) && 
+      this.options.retry_max_delay > 0) {
      this.retry_max_delay = this.options.retry_max_delay;
   }
   this.connect_timeout = false;
-  if (this.options.connect_timeout
-     && !isNaN(this.options.connect_timeout) && this.options.connect_timeout > 0) {
+  if (this.options.connect_timeout && 
+      !isNaN(this.options.connect_timeout) && 
+      this.options.connect_timeout > 0) {
      this.connect_timeout = +this.options.connect_timeout;
   }
   this.connection.addListener("connect", function() {
@@ -89,9 +94,9 @@ var Connection = exports.Connection = function(stream, options) {
   this.connection.addListener("error", function(err) {
     // Only emit the error if no-one else is listening on the connection
     // or if someone is listening on us
-    if (self.connection.listeners('error').length === 1
-        || self.listeners('error').length > 0) {
-      self.emit("error", err)
+    if (self.connection.listeners('error').length === 1 || 
+        self.listeners('error').length > 0) {
+      self.emit("error", err);
     }
     // "error" events get turned into exceptions if they aren't listened for.  If the user handled this error
     // then we should try to reconnect.
@@ -114,7 +119,25 @@ var Connection = exports.Connection = function(stream, options) {
         var header = message.readMessageBegin();
         var dummy_seqid = header.rseqid * -1;
         var client = self.client;
-        client._reqs[dummy_seqid] = function(err, success){
+        //The Multiplexed Protocol stores a hash of seqid to service names
+        //  in seqId2Service. If the SeqId is found in the hash we need to
+        //  lookup the appropriate client for this call.
+        //  The connection.client object is a single client object when not
+        //  multiplexing, when using multiplexing it is a service name keyed 
+        //  hash of client objects.
+        //NOTE: The 2 way interdependencies between protocols, transports,
+        //  connections and clients in the Node.js implementation are irregular
+        //  and make the implementation difficult to extend and maintain. We 
+        //  should bring this stuff inline with typical thrift I/O stack 
+        //  operation soon.
+        //  --ra
+        var service_name = self.seqId2Service[header.rseqid];
+        if (service_name) {
+          client = self.client[service_name];
+          delete self.seqId2Service[header.rseqid];
+        }
+        /*jshint -W083 */
+        client._reqs[dummy_seqid] = function(err, success){ 
           transport_with_data.commitPosition();
 
           var callback = client._reqs[header.rseqid];
@@ -123,13 +146,15 @@ var Connection = exports.Connection = function(stream, options) {
             callback(err, success);
           }
         };
-
-        if(!client['recv_' + header.fname]) {
-          // msg was for another serivce, just drop it
-          delete client._reqs[dummy_seqid]
-          return
+        /*jshint +W083 */
+
+        if(client['recv_' + header.fname]) {
+          client['recv_' + header.fname](message, header.mtype, dummy_seqid);
+        } else {
+          delete client._reqs[dummy_seqid];
+          throw new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
+                             "Received a response to an unknown RPC function");
         }
-        client['recv_' + header.fname](message, header.mtype, dummy_seqid);
       }
     }
     catch (e) {
@@ -146,7 +171,7 @@ util.inherits(Connection, EventEmitter);
 
 Connection.prototype.end = function() {
   this.connection.end();
-}
+};
 
 Connection.prototype.initialize_retry_vars = function () {
   this.retry_timer = null;
@@ -162,14 +187,14 @@ Connection.prototype.write = function(data) {
     return;
   }
   this.connection.write(data);
-}
+};
 
 Connection.prototype.connection_gone = function () {
   var self = this;
 
   // If a retry is already in progress, just let that happen
   if (this.retry_timer) {
-	 return;
+    return;
   }
   if (!this.max_attempts) {
     self.emit("close");
@@ -228,7 +253,7 @@ exports.createConnection = function(host, port, options) {
   connection.port = port;
 
   return connection;
-}
+};
 
 exports.createSSLConnection = function(host, port, options) {
   var stream = tls.connect(port, host, options);
@@ -237,7 +262,7 @@ exports.createSSLConnection = function(host, port, options) {
   connection.port = port;
 
   return connection;
-}
+};
 
 
 exports.createClient = function(cls, connection) {
@@ -252,7 +277,7 @@ exports.createClient = function(cls, connection) {
   connection.client = client;
 
   return client;
-}
+};
 
 var child_process = require('child_process');
 var StdIOConnection = exports.StdIOConnection = function(command, options) {
@@ -326,14 +351,13 @@ var StdIOConnection = exports.StdIOConnection = function(command, options) {
       }
     }
   }));
-
 };
 
 util.inherits(StdIOConnection, EventEmitter);
 
 StdIOConnection.prototype.end = function() {
   this.connection.end();
-}
+};
 
 StdIOConnection.prototype.write = function(data) {
   if (!this.connected) {
@@ -341,10 +365,10 @@ StdIOConnection.prototype.write = function(data) {
     return;
   }
   this.connection.write(data);
-}
+};
+
 exports.createStdIOConnection = function(command,options){
   return new StdIOConnection(command,options);
-
 };
 
 exports.createStdIOClient = function(cls,connection) {
@@ -360,4 +384,4 @@ exports.createStdIOClient = function(cls,connection) {
   connection.client = client;
 
   return client;
-}
+};

http://git-wip-us.apache.org/repos/asf/thrift/blob/7201c0d3/lib/nodejs/lib/thrift/multiplexed_processor.js
----------------------------------------------------------------------
diff --git a/lib/nodejs/lib/thrift/multiplexed_processor.js b/lib/nodejs/lib/thrift/multiplexed_processor.js
index 2931c4f..1aef4c3 100644
--- a/lib/nodejs/lib/thrift/multiplexed_processor.js
+++ b/lib/nodejs/lib/thrift/multiplexed_processor.js
@@ -38,7 +38,7 @@ MultiplexedProcessor.prototype.process = function(inp, out) {
     var sname = p[0];
     var fname = p[1];
 
-    if (! sname in this.services) {
+    if (! (sname in this.services)) {
         throw new Thrift.TException("TMultiplexedProcessor: Unknown service: " + sname);
     }
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/7201c0d3/lib/nodejs/lib/thrift/multiplexed_protocol.js
----------------------------------------------------------------------
diff --git a/lib/nodejs/lib/thrift/multiplexed_protocol.js b/lib/nodejs/lib/thrift/multiplexed_protocol.js
index 9a955ab..68440af 100644
--- a/lib/nodejs/lib/thrift/multiplexed_protocol.js
+++ b/lib/nodejs/lib/thrift/multiplexed_protocol.js
@@ -19,27 +19,31 @@
 var util = require('util');
 var Thrift = require('./thrift');
 
-var Wrapper = exports.Wrapper = function(service_name, protocol) {
+var Wrapper = exports.Wrapper = function(service_name, protocol, connection) {
 
     var MultiplexProtocol = function(trans, strictRead, strictWrite) {
         protocol.call(this, trans, strictRead, strictWrite);
-    }
+    };
     util.inherits(MultiplexProtocol, protocol);
 
     MultiplexProtocol.prototype.writeMessageBegin = function(name, type, seqid) {
-
-        if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY)
-            MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, service_name + ":" + name, type, seqid);
-        else
+        if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY) {
+            connection.seqId2Service[seqid] = service_name;
+            MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, 
+                                                                      service_name + ":" + name, 
+                                                                      type, 
+                                                                      seqid);
+        } else {
             MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, name, type, seqid);
-    }
+        }
+    };
 
     return MultiplexProtocol;
-}
+};
 
 var Multiplexer = exports.Multiplexer = function() {
     this.seqid = 0;
-}
+};
 
 Multiplexer.prototype.createClient = function(service_name, cls, connection) {
     if (cls.Client) {
@@ -49,15 +53,15 @@ Multiplexer.prototype.createClient = function(service_name, cls, connection) {
     cls.prototype.new_seqid = function() {
         self.seqid += 1;
         return self.seqid;
-    }
-
+    };
     var client = new cls(new connection.transport(undefined, function(buf) {
         connection.write(buf);
-    }), new Wrapper(service_name, connection.protocol));
-
-
-    // TODO clean this up
-    connection.client = client;
+    }), new Wrapper(service_name, connection.protocol, connection));
+    
+    if (typeof connection.client !== 'object') {
+       connection.client = {};
+    }
+    connection.client[service_name] = client;
 
     return client;
-}
+};