You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/08/27 00:54:36 UTC

[21/50] [abbrv] tinkerpop git commit: driver_remote_connection has been simplified again and authentification fixed and working. ta-to-the-da.

driver_remote_connection has been simplified again and authentification fixed and working. ta-to-the-da.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d299cbcd
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d299cbcd
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d299cbcd

Branch: refs/heads/master
Commit: d299cbcd73db12fae9bafaf9714c47fff51876b2
Parents: ba8bb60
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Aug 25 08:47:27 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Aug 25 08:47:44 2016 -0600

----------------------------------------------------------------------
 .../driver/driver_remote_connection.py          | 199 +++++++++----------
 1 file changed, 95 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d299cbcd/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
index 50a9944..165eed6 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
@@ -16,6 +16,7 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 """
+import base64
 import json
 import uuid
 from tornado import gen
@@ -34,75 +35,26 @@ class GremlinServerError(Exception):
 
 
 class DriverRemoteConnection(RemoteConnection):
-    def __init__(self, url, traversal_source, loop=None, username="", password=""):
+    def __init__(self, url, traversal_source, username="", password="", loop=None):
         super(DriverRemoteConnection, self).__init__(url, traversal_source)
-        if loop is None:
-            self._loop = ioloop.IOLoop.current()
-        self._ws = self._loop.run_sync(lambda: websocket.websocket_connect(self.url))
         self._username = username
         self._password = password
-
-    def submit(self,
-               bytecode,
-               op="bytecode",
-               processor="traversal"):
+        if loop is None: self._loop = ioloop.IOLoop.current()
+        self._websocket = self._loop.run_sync(lambda: websocket.websocket_connect(self.url))
+
+    def submit(self, bytecode):
+        '''
+        :param bytecode: the bytecode of a traversal to submit to the RemoteConnection
+        :return: a RemoteTraversal with RemoteTraversalSideEffects
+        '''
         request_id = str(uuid.uuid4())
-        traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(bytecode, request_id))
-        keys_lambda = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
-        value_lambda = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
-        return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys_lambda, value_lambda))
-
-    @gen.coroutine
-    def submit_traversal_bytecode(self, bytecode, request_id):
-        message = self._get_traversal_bytecode_message(bytecode, request_id)
-        traversers = yield self._execute_message(message)
-        raise gen.Return(traversers)
-
-    @gen.coroutine
-    def submit_sideEffect_keys(self, request_id):
-        message = self._get_sideEffect_keys_message(request_id)
-        keys = yield self._execute_message(message)
-        raise gen.Return(set(keys))
+        traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode))
+        side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
+        side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
+        return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(side_effect_keys, side_effect_value))
 
     @gen.coroutine
-    def submit_sideEffect_value(self, request_id, key):
-        message = self._get_sideEffect_value_message(request_id, key)
-        side_effects = yield self._execute_message(message)
-        raise gen.Return(side_effects)
-
-    @gen.coroutine
-    def _execute_message(self, message):
-        if self._ws.protocol is None:
-            self._ws = yield websocket.websocket_connect(self.url)
-        self._ws.write_message(message, binary=True)
-        resp = Response(self._ws, self._username, self._password)
-        results = None
-        while True:
-            msg = yield resp.receive()
-            if msg is None:
-                break
-            # on first message, get the right result data structure
-            if None == results:
-                if "list" == msg[0]:
-                    results = []
-                elif "set" == msg[0]:
-                    results = set()
-                elif "map" == msg[0]:
-                    results = {}
-                else:
-                    results = []
-            # updating a map is different than a list or a set
-            if isinstance(results, dict):
-                for item in msg[1]:
-                    results.update(item)
-            else:
-                results += msg[1]
-        raise gen.Return([] if None == results else results)
-
-    def close(self):
-        self._ws.close()
-
-    def _get_traversal_bytecode_message(self, bytecode, request_id):
+    def submit_traversal_bytecode(self, request_id, bytecode):
         message = {
             "requestId": {
                 "@type": "g:UUID",
@@ -115,9 +67,11 @@ class DriverRemoteConnection(RemoteConnection):
                 "aliases": {"g": self.traversal_source}
             }
         }
-        return DriverRemoteConnection._finalize_message(message)
+        traversers = yield self._execute_message(message)
+        raise gen.Return(traversers)
 
-    def _get_sideEffect_keys_message(self, request_id):
+    @gen.coroutine
+    def submit_sideEffect_keys(self, request_id):
         message = {
             "requestId": {
                 "@type": "g:UUID",
@@ -132,9 +86,11 @@ class DriverRemoteConnection(RemoteConnection):
                 }
             }
         }
-        return DriverRemoteConnection._finalize_message(message)
+        keys = yield self._execute_message(message)
+        raise gen.Return(set(keys))
 
-    def _get_sideEffect_value_message(self, request_id, key):
+    @gen.coroutine
+    def submit_sideEffect_value(self, request_id, key):
         message = {
             "requestId": {
                 "@type": "g:UUID",
@@ -151,60 +107,95 @@ class DriverRemoteConnection(RemoteConnection):
                 "aliases": {"g": self.traversal_source}
             }
         }
-        return DriverRemoteConnection._finalize_message(message)
+        value = yield self._execute_message(message)
+        raise gen.Return(value)
+
+    @gen.coroutine
+    def _execute_message(self, send_message):
+        send_message = b"".join([b"\x21",
+                                 b"application/vnd.gremlin-v2.0+json",
+                                 json.dumps(send_message).encode("utf-8")])
+        if self._websocket.protocol is None:
+            self._websocket = yield websocket.websocket_connect(self.url)
+        self._websocket.write_message(send_message, binary=True)
+        response = Response(self._websocket, self._username, self._password)
+        results = None
+        while True:
+            recv_message = yield response.receive()
+            if recv_message is None:
+                break
+
+            # on first message, get the right result data structure
+            if None == results:
+                if "list" == recv_message[0]:
+                    results = []
+                elif "set" == recv_message[0]:
+                    results = set()
+                elif "map" == recv_message[0]:
+                    results = {}
+                elif "none" == recv_message[0]:
+                    results = None
+                else:
+                    results = []
+
+            # if there is no update to a structure, then the item is the result
+            if results is None:
+                results = recv_message[1][0]
+            # updating a map is different than a list or a set
+            elif isinstance(results, dict):
+                for item in recv_message[1]:
+                    results.update(item)
+            # flat add list to result list
+            else:
+                results += recv_message[1]
+        raise gen.Return([] if None == results else results)
 
-    @staticmethod
-    def _finalize_message(message):
-        message = json.dumps(message)
-        mime_type = b"application/vnd.gremlin-v2.0+json"
-        mime_len = b"\x21"
-        return b"".join([mime_len, mime_type, message.encode("utf-8")])
+    def close(self):
+        self._websocket.close()
 
 
 class Response:
-    def __init__(self, ws, username, password):
-        self._ws = ws
-        self._closed = False
+    def __init__(self, websocket, username, password):
+        self._websocket = websocket
         self._username = username
         self._password = password
-
-    def _authenticate(self, username, password):
-        auth = b"".join([b"\x00", username.encode("utf-8"),
-                         b"\x00", password.encode("utf-8")])
-        message = {
-            "requestId": {
-                "@type": "g:UUID",
-                "@value": str(uuid.uuid4())
-            },
-            "op": "authentication",
-            "processor": "traversal",
-            "args": {
-                "sasl": base64.b64encode(auth).decode()
-            }
-        }
-        self._ws.send_message(DriverRemoteConnection._finalize_message(message), binary=True)
+        self._closed = False
 
     @gen.coroutine
     def receive(self):
         if self._closed:
             return
-        data = yield self._ws.read_message()
-        message = json.loads(data)
-        status_code = message["status"]["code"]
-        data = message["result"]["data"]
-        msg = message["status"]["message"]
-        meta = message["result"]["meta"]
-        aggregateTo = "list" if "aggregateTo" not in meta else meta["aggregateTo"]
+        recv_message = yield self._websocket.read_message()
+        recv_message = json.loads(recv_message)
+        status_code = recv_message["status"]["code"]
+        aggregateTo = recv_message["result"]["meta"].get("aggregateTo", "list")
 
+        # authentification required then
         if status_code == 407:
-            self._authenticate(self._username, self._password)
-            yield self.receive()
+            self._websocket.write_message(
+                b"".join([b"\x21",
+                          b"application/vnd.gremlin-v2.0+json",
+                          json.dumps({
+                              "requestId": {
+                                  "@type": "g:UUID",
+                                  "@value": str(uuid.uuid4())
+                              },
+                              "op": "authentication",
+                              "processor": "traversal",
+                              "args": {
+                                  "sasl": base64.b64encode(
+                                      b"".join([b"\x00", self._username.encode("utf-8"),
+                                                b"\x00", self._password.encode("utf-8")])).decode()
+                              }
+                          }).encode("utf-8")]), binary=True)
+            results = yield self.receive()
+            raise gen.Return(results)
         elif status_code == 204:
             self._closed = True
             return
         elif status_code in [200, 206]:
             results = []
-            for item in data:
+            for item in recv_message["result"]["data"]:
                 results.append(GraphSONReader._objectify(item))
             if status_code == 200:
                 self._closed = True
@@ -212,4 +203,4 @@ class Response:
         else:
             self._closed = True
             raise GremlinServerError(
-                "{0}: {1}".format(status_code, msg))
+                "{0}: {1}".format(status_code, recv_message["status"]["message"]))