You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by ra...@apache.org on 2014/11/16 08:08:29 UTC

thrift git commit: THRIFT-2819 Client Node Patch: Chi Vinh Le

Repository: thrift
Updated Branches:
  refs/heads/master c118db2ce -> 2e091f681


THRIFT-2819
Client Node
Patch: Chi Vinh Le

Adds websocket client to Node with tests


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/2e091f68
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/2e091f68
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/2e091f68

Branch: refs/heads/master
Commit: 2e091f681b48562f7b9706c77243787901963e44
Parents: c118db2
Author: Randy Abernethy <ra...@apache.org>
Authored: Sat Nov 15 23:05:22 2014 -0800
Committer: Randy Abernethy <ra...@apache.org>
Committed: Sat Nov 15 23:05:22 2014 -0800

----------------------------------------------------------------------
 lib/nodejs/lib/thrift/index.js         |   5 +
 lib/nodejs/lib/thrift/web_server.js    |   8 +-
 lib/nodejs/lib/thrift/ws_connection.js | 296 ++++++++++++++++++++++++++++
 lib/nodejs/package.json                |  52 ++---
 lib/nodejs/test/testAll.sh             |  22 +++
 lib/nodejs/test/ws_client.js           |  81 ++++++++
 6 files changed, 438 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/2e091f68/lib/nodejs/lib/thrift/index.js
----------------------------------------------------------------------
diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js
index ea7fde0..9b53dd0 100644
--- a/lib/nodejs/lib/thrift/index.js
+++ b/lib/nodejs/lib/thrift/index.js
@@ -31,6 +31,11 @@ exports.HttpConnection = httpConnection.HttpConnection;
 exports.createHttpConnection = httpConnection.createHttpConnection;
 exports.createHttpClient = httpConnection.createHttpClient;
 
+var wsConnection = require('./ws_connection');
+exports.WSConnection = wsConnection.WSConnection;
+exports.createWSConnection = wsConnection.createWSConnection;
+exports.createWSClient = wsConnection.createWSClient;
+
 var server = require('./server');
 exports.createServer = server.createServer;
 exports.createMultiplexServer = server.createMultiplexServer;

http://git-wip-us.apache.org/repos/asf/thrift/blob/2e091f68/lib/nodejs/lib/thrift/web_server.js
----------------------------------------------------------------------
diff --git a/lib/nodejs/lib/thrift/web_server.js b/lib/nodejs/lib/thrift/web_server.js
index 40fc1ae..926b72c 100644
--- a/lib/nodejs/lib/thrift/web_server.js
+++ b/lib/nodejs/lib/thrift/web_server.js
@@ -27,7 +27,7 @@ var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcess
 var TTransport = require('./transport');
 var TBufferedTransport = require('./transport').TBufferedTransport;
 var TBinaryProtocol = require('./protocol').TBinaryProtocol;
-
+var TJSONProtocol = require('./protocol').TJSONProtocol;
 
 // WSFrame constructor and prototype 
 /////////////////////////////////////////////////////////////////////
@@ -452,11 +452,12 @@ exports.createWebServer = function(options) {
   ///////////////////////////////////////////////////
   function processWS(data, socket, svc, binEncoding) {
     svc.transport.receiver(function(transportWithData) {
+      var binary = svc.protocol != TJSONProtocol;
       var input = new svc.protocol(transportWithData);
       var output = new svc.protocol(new svc.transport(undefined, function(buf) {
         try {
           var frame = wsFrame.encode(buf, null, binEncoding);
-          socket.write(frame);
+          socket.write(frame, null, binary);
         } catch (err) {
           //TODO: Add better error processing
         }
@@ -519,10 +520,11 @@ exports.createWebServer = function(options) {
                    "\r\n");
     //Handle WebSocket traffic
     var data = null;
+    var binary = svc.protocol != TJSONProtocol;
     socket.on('data', function(frame) {
       try {
         while (frame) {
-          var result = wsFrame.decode(frame);
+          var result = wsFrame.decode(frame, null, binary);
           //Prepend any existing decoded data
           if (data) {
             if (result.data) {

http://git-wip-us.apache.org/repos/asf/thrift/blob/2e091f68/lib/nodejs/lib/thrift/ws_connection.js
----------------------------------------------------------------------
diff --git a/lib/nodejs/lib/thrift/ws_connection.js b/lib/nodejs/lib/thrift/ws_connection.js
new file mode 100644
index 0000000..54dd936
--- /dev/null
+++ b/lib/nodejs/lib/thrift/ws_connection.js
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+var util = require('util');
+var WebSocket = require('ws');
+var EventEmitter = require("events").EventEmitter;
+var thrift = require('./thrift');
+var ttransport = require('./transport');
+var tprotocol = require('./protocol');
+
+/**
+ * @class
+ * @name WSConnectOptions
+ * @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc).
+ * @property {string} protocol - The Thrift serialization protocol to use (TJSONProtocol, etc.).
+ * @property {string} path - The URL path to connect to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.).
+ * @property {object} headers - A standard Node.js header hash, an object hash containing key/value
+ *        pairs where the key is the header name string and the value is the header value string.
+ * @property {boolean} secure - True causes the connection to use wss, otherwise ws is used.
+ * @property {object} wsOptions - Options passed on to WebSocket.
+ * @example
+ *     //Use a secured websocket connection
+ *     //  uses the buffered transport layer, uses the JSON protocol and directs RPC traffic
+ *     //  to wss://thrift.example.com:9090/hello
+ *     var thrift = require('thrift');
+ *     var options = {
+ *        transport: thrift.TBufferedTransport,
+ *        protocol: thrift.TJSONProtocol,
+ *        path: "/hello",
+ *        secure: true
+ *     };
+ *     var con = thrift.createWSConnection("thrift.example.com", 9090, options);
+ *     con.open()
+ *     var client = thrift.createWSClient(myService, connection);
+ *     client.myServiceFunction();
+ *     con.close()
+ */
+
+/**
+ * Initializes a Thrift WSConnection instance (use createWSConnection() rather than
+ *    instantiating directly).
+ * @constructor
+ * @param {string} host - The host name or IP to connect to.
+ * @param {number} port - The TCP port to connect to.
+ * @param {WSConnectOptions} options - The configuration options to use.
+ * @throws {error} Exceptions other than ttransport.InputBufferUnderrunError are rethrown
+ * @event {error} The "error" event is fired when a Node.js error event occurs during
+ *     request or response processing, in which case the node error is passed on. An "error"
+ *     event may also be fired when the connectison can not map a response back to the
+ *     appropriate client (an internal error), generating a TApplicationException.
+ * @classdesc WSConnection objects provide Thrift end point transport
+ *     semantics implemented using Websockets.
+ * @see {@link createWSConnection}
+ */
+var WSConnection = exports.WSConnection = function(host, port, options) {
+  //Initialize the emitter base object
+  EventEmitter.call(this);
+
+  //Set configuration
+  var self = this;
+  this.options = options || {};
+  this.host = host;
+  this.port = port;
+  this.secure = this.options.secure || false;
+  this.transport = this.options.transport || ttransport.TBufferedTransport;
+  this.protocol = this.options.protocol || tprotocol.TJSONProtocol;
+  this.path = this.options.path;
+  this.send_pending = [];
+
+  //The sequence map is used to map seqIDs back to the 
+  //  calling client in multiplexed scenarios
+  this.seqId2Service = {};
+
+  //Prepare WebSocket options
+  this.wsOptions = {
+    host: this.host,
+    port: this.port || 80,
+    path: this.options.path || '/',
+    headers: this.options.headers || {}
+  };
+  for (var attrname in this.options.wsOptions) {
+    this.wsOptions[attrname] = this.options.wsOptions[attrname];
+  }
+};
+util.inherits(WSConnection, EventEmitter);
+
+WSConnection.prototype.__reset = function() {
+  this.socket = null; //The web socket
+  this.send_pending = []; //Buffers/Callback pairs waiting to be sent
+};
+
+WSConnection.prototype.__onOpen = function() {
+  var self = this;
+  this.emit("open");
+  if (this.send_pending.length > 0) {
+    //If the user made calls before the connection was fully 
+    //open, send them now
+    this.send_pending.forEach(function(data) {
+      self.socket.send(data);
+    });
+    this.send_pending = [];
+  }
+};
+
+WSConnection.prototype.__onClose = function(evt) {
+  this.emit("close");
+  this.__reset();
+};
+
+WSConnection.prototype.__decodeCallback = function(transport_with_data) {
+  var proto = new this.protocol(transport_with_data);
+  try {
+    while (true) {
+      var header = proto.readMessageBegin();
+      var dummy_seqid = header.rseqid * -1;
+      var client = this.client;
+      //The Multiplexed Protocol stores a hash of seqid to service names
+      //  in seqId2Service. If the SeqId is found in the hash we need to
+      //  lookup the appropriate client for this call.
+      //  The client var is a single client object when not multiplexing, 
+      //  when using multiplexing it is a service name keyed hash of client
+      //  objects.
+      //NOTE: The 2 way interdependencies between protocols, transports,
+      //  connections and clients in the Node.js implementation are irregular
+      //  and make the implementation difficult to extend and maintain. We 
+      //  should bring this stuff inline with typical thrift I/O stack
+      //  operation soon.
+      //  --ra
+      var service_name = this.seqId2Service[header.rseqid];
+      if (service_name) {
+        client = this.client[service_name];
+        delete this.seqId2Service[header.rseqid];
+      }
+      /*jshint -W083 */
+      client._reqs[dummy_seqid] = function(err, success) {
+        transport_with_data.commitPosition();
+        var clientCallback = client._reqs[header.rseqid];
+        delete client._reqs[header.rseqid];
+        if (clientCallback) {
+          clientCallback(err, success);
+        }
+      };
+      /*jshint +W083 */
+      if (client['recv_' + header.fname]) {
+        client['recv_' + header.fname](proto, header.mtype, dummy_seqid);
+      } else {
+        delete client._reqs[dummy_seqid];
+        this.emit("error",
+          new thrift.TApplicationException(
+            thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
+            "Received a response to an unknown RPC function"));
+      }
+    }
+  } catch (e) {
+    if (e instanceof ttransport.InputBufferUnderrunError) {
+      transport_with_data.rollbackPosition();
+    } else {
+      throw e;
+    }
+  }
+};
+
+WSConnection.prototype.__onData = function(data) {
+  if (Object.prototype.toString.call(data) == "[object ArrayBuffer]") {
+    data = new Uint8Array(data);
+  }
+  var buf = new Buffer(data);
+  this.transport.receiver(this.__decodeCallback.bind(this))(buf);
+
+};
+WSConnection.prototype.__onMessage = function(evt) {
+
+  this.__onData(evt.data);
+};
+
+WSConnection.prototype.__onError = function(evt) {
+  this.emit("error", evt);
+  this.socket.close();
+};
+
+/**
+ * Returns true if the transport is open
+ * @readonly
+ * @returns {boolean}
+ */
+WSConnection.prototype.isOpen = function() {
+  return this.socket && this.socket.readyState == this.socket.OPEN;
+};
+
+/**
+ * Opens the transport connection
+ */
+WSConnection.prototype.open = function() {
+  //If OPEN/CONNECTING/CLOSING ignore additional opens
+  if (this.socket && this.socket.readyState != this.socket.CLOSED) {
+    return;
+  }
+  //If there is no socket or the socket is closed:
+  this.socket = new WebSocket(this.uri(), "", this.wsOptions);
+  this.socket.binaryType = 'arraybuffer';
+  this.socket.onopen = this.__onOpen.bind(this);
+  this.socket.onmessage = this.__onMessage.bind(this);
+  this.socket.onerror = this.__onError.bind(this);
+  this.socket.onclose = this.__onClose.bind(this);
+};
+
+/**
+ * Closes the transport connection
+ */
+WSConnection.prototype.close = function() {
+  this.socket.close();
+};
+
+/**
+ * Return URI for the connection
+ * @returns {string} URI
+ */
+
+WSConnection.prototype.uri = function() {
+  var schema = this.secure ? 'wss' : 'ws';
+  var port = '';
+  var path = this.path || '/';
+  var host = this.host;
+
+  // avoid port if default for schema
+  if (this.port && (('wss' == schema && this.port != 443) ||
+    ('ws' == schema && this.port != 80))) {
+    port = ':' + this.port;
+  }
+
+  return schema + '://' + host + port + path;
+};
+
+/**
+ * Writes Thrift message data to the connection
+ * @param {Buffer} data - A Node.js Buffer containing the data to write
+ * @returns {void} No return value.
+ * @event {error} the "error" event is raised upon request failure passing the
+ *     Node.js error object to the listener.
+ */
+WSConnection.prototype.write = function(data) {
+  if (this.isOpen()) {
+    //Send data and register a callback to invoke the client callback
+    this.socket.send(data);
+  } else {
+    //Queue the send to go out __onOpen
+    this.send_pending.push(data);
+  }
+};
+
+/**
+ * Creates a new WSConnection object, used by Thrift clients to connect
+ *    to Thrift HTTP based servers.
+ * @param {string} host - The host name or IP to connect to.
+ * @param {number} port - The TCP port to connect to.
+ * @param {WSConnectOptions} options - The configuration options to use.
+ * @returns {WSConnection} The connection object.
+ * @see {@link WSConnectOptions}
+ */
+exports.createWSConnection = function(host, port, options) {
+  return new WSConnection(host, port, options);
+};
+
+/**
+ * Creates a new client object for the specified Thrift service.
+ * @param {object} cls - The module containing the service client
+ * @param {WSConnection} wsConnection - The connection to use.
+ * @returns {object} The client object.
+ * @see {@link createWSConnection}
+ */
+exports.createWSClient = function(cls, wsConnection) {
+  if (cls.Client) {
+    cls = cls.Client;
+  }
+  wsConnection.client =
+    new cls(new wsConnection.transport(undefined, function(buf) {
+        wsConnection.write(buf);
+      }),
+      wsConnection.protocol);
+  return wsConnection.client;
+};

http://git-wip-us.apache.org/repos/asf/thrift/blob/2e091f68/lib/nodejs/package.json
----------------------------------------------------------------------
diff --git a/lib/nodejs/package.json b/lib/nodejs/package.json
index d7e39e4..51216d2 100755
--- a/lib/nodejs/package.json
+++ b/lib/nodejs/package.json
@@ -2,38 +2,44 @@
   "name": "thrift",
   "description": "node.js bindings for the Apache Thrift RPC system",
   "homepage": "http://thrift.apache.org/",
-  "repository":
-    { "type" : "git",
-      "url" : "https://git-wip-us.apache.org/repos/asf/thrift.git"
-    },
+  "repository": {
+    "type": "git",
+    "url": "https://git-wip-us.apache.org/repos/asf/thrift.git"
+  },
   "version": "1.0.0-dev",
-  "author":
-    { "name": "Apache Thrift Developers",
-      "email": "dev@thrift.apache.org",
-      "url": "http://thrift.apache.org"
-    },
-  "licenses":
-    [ { "type": "Apache-2.0",
-        "url": "http://www.apache.org/licenses/LICENSE-2.0"
-      }
-    ],
-  "bugs":
-    { "mail": "dev@thrift.apache.org",
-      "url": "https://issues.apache.org/jira/browse/THRIFT"
-    },
-  "directories" : { "lib" : "./lib/thrift" },
+  "author": {
+    "name": "Apache Thrift Developers",
+    "email": "dev@thrift.apache.org",
+    "url": "http://thrift.apache.org"
+  },
+  "licenses": [
+    {
+      "type": "Apache-2.0",
+      "url": "http://www.apache.org/licenses/LICENSE-2.0"
+    }
+  ],
+  "bugs": {
+    "mail": "dev@thrift.apache.org",
+    "url": "https://issues.apache.org/jira/browse/THRIFT"
+  },
+  "directories": {
+    "lib": "./lib/thrift"
+  },
   "main": "./lib/thrift",
-  "engines": { "node": ">= 0.2.4" },
+  "engines": {
+    "node": ">= 0.2.4"
+  },
   "dependencies": {
     "node-int64": "~0.3.0",
-	"q": "1.0.x",
-    "nodeunit": "~0.8.0"
+    "q": "1.0.x",
+    "nodeunit": "~0.8.0",
+    "ws": "~0.4.32"
   },
   "devDependencies": {
     "connect": "2.7.x",
     "commander": "2.1.x"
   },
   "scripts": {
-    "test" : "test/testAll.sh"
+    "test": "test/testAll.sh"
   }
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/2e091f68/lib/nodejs/test/testAll.sh
----------------------------------------------------------------------
diff --git a/lib/nodejs/test/testAll.sh b/lib/nodejs/test/testAll.sh
index e09c783..4008eec 100755
--- a/lib/nodejs/test/testAll.sh
+++ b/lib/nodejs/test/testAll.sh
@@ -59,6 +59,18 @@ testHttpClientServer()
   return $RET
 }
 
+testWSClientServer()
+{
+  echo "   Testing WebSocket Client/Server with protocol $1 and transport $2 $3";
+  RET=0
+  node ${DIR}/http_server.js -p $1 -t $2 $3 &
+  SERVERPID=$!
+  sleep 1
+  node ${DIR}/ws_client.js -p $1 -t $2 $3 || RET=1
+  kill -9 $SERVERPID || RET=1
+  return $RET
+}
+
 
 TESTOK=0
 
@@ -104,4 +116,14 @@ testHttpClientServer binary framed || TESTOK=1
 testHttpClientServer json buffered --promise || TESTOK=1
 testHttpClientServer binary framed --ssl || TESTOK=1
 
+#WebSocket tests
+testWSClientServer compact buffered || TESTOK=1
+testWSClientServer compact framed || TESTOK=1
+testWSClientServer json buffered || TESTOK=1
+testWSClientServer json framed || TESTOK=1
+testWSClientServer binary buffered || TESTOK=1
+testWSClientServer binary framed || TESTOK=1
+testWSClientServer json buffered --promise || TESTOK=1
+testWSClientServer binary framed --ssl || TESTOK=1
+
 exit $TESTOK

http://git-wip-us.apache.org/repos/asf/thrift/blob/2e091f68/lib/nodejs/test/ws_client.js
----------------------------------------------------------------------
diff --git a/lib/nodejs/test/ws_client.js b/lib/nodejs/test/ws_client.js
new file mode 100644
index 0000000..4573246
--- /dev/null
+++ b/lib/nodejs/test/ws_client.js
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//This is the client side test for the standard Apache Thrift
+//"ThriftTest" suite. This client will test any protocol/transport
+//combination specified on the command line.
+
+var fs = require('fs');
+var assert = require('assert');
+var thrift = require('thrift');
+var ThriftTest = require('./gen-nodejs/ThriftTest');
+var ThriftTestDriver = require('./thrift_test_driver').ThriftTestDriver;
+var ThriftTestDriverPromise = require('./thrift_test_driver_promise').ThriftTestDriver;
+
+var program = require('commander');
+
+program
+  .option('-p, --protocol <protocol>', 'Set thrift protocol (binary|json) [protocol]')
+  .option('-t, --transport <transport>', 'Set thrift transport (buffered|framed) [transport]')
+  .option('--ssl', 'use wss instead of ws')
+  .option('--promise', 'test with promise style functions')
+  .parse(process.argv);
+
+var protocol = thrift.TBinaryProtocol;
+if (program.protocol === "json") {
+  protocol = thrift.TJSONProtocol;
+} 
+
+var transport =  thrift.TBufferedTransport;
+if (program.transport === "framed") {
+  transport = thrift.TFramedTransport;
+}
+
+var options = {
+   transport: transport,
+   protocol: protocol,
+   path: "/test"
+};
+
+if (program.ssl) {
+  options.wsOptions = { rejectUnauthorized: false };
+  options.secure = true;
+} 
+
+var connection = thrift.createWSConnection("localhost", 9090, options);
+connection.open();
+
+var client = thrift.createWSClient(ThriftTest, connection);
+
+connection.on('error', function(err) {
+  assert(false, err);
+});
+
+var testDriver = ThriftTestDriver;
+if (program.promise) {
+  console.log("    --Testing promise style client");
+  testDriver = ThriftTestDriverPromise;
+} 
+testDriver(client, function (status) {
+  console.log(status);
+  process.exit(0);
+});
+
+// to make it also run on expresso
+exports.expressoTest = function() {};