You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/08/25 14:47:49 UTC
tinkerpop git commit: driver_remote_connection has been simplified
again and authentification fixed and working. ta-to-the-da.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1278 ba8bb6097 -> d299cbcd7
driver_remote_connection has been simplified again and authentification fixed and working. ta-to-the-da.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d299cbcd
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d299cbcd
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d299cbcd
Branch: refs/heads/TINKERPOP-1278
Commit: d299cbcd73db12fae9bafaf9714c47fff51876b2
Parents: ba8bb60
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Aug 25 08:47:27 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Aug 25 08:47:44 2016 -0600
----------------------------------------------------------------------
.../driver/driver_remote_connection.py | 199 +++++++++----------
1 file changed, 95 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d299cbcd/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
index 50a9944..165eed6 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
@@ -16,6 +16,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
+import base64
import json
import uuid
from tornado import gen
@@ -34,75 +35,26 @@ class GremlinServerError(Exception):
class DriverRemoteConnection(RemoteConnection):
- def __init__(self, url, traversal_source, loop=None, username="", password=""):
+ def __init__(self, url, traversal_source, username="", password="", loop=None):
super(DriverRemoteConnection, self).__init__(url, traversal_source)
- if loop is None:
- self._loop = ioloop.IOLoop.current()
- self._ws = self._loop.run_sync(lambda: websocket.websocket_connect(self.url))
self._username = username
self._password = password
-
- def submit(self,
- bytecode,
- op="bytecode",
- processor="traversal"):
+ if loop is None: self._loop = ioloop.IOLoop.current()
+ self._websocket = self._loop.run_sync(lambda: websocket.websocket_connect(self.url))
+
+ def submit(self, bytecode):
+ '''
+ :param bytecode: the bytecode of a traversal to submit to the RemoteConnection
+ :return: a RemoteTraversal with RemoteTraversalSideEffects
+ '''
request_id = str(uuid.uuid4())
- traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(bytecode, request_id))
- keys_lambda = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
- value_lambda = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
- return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys_lambda, value_lambda))
-
- @gen.coroutine
- def submit_traversal_bytecode(self, bytecode, request_id):
- message = self._get_traversal_bytecode_message(bytecode, request_id)
- traversers = yield self._execute_message(message)
- raise gen.Return(traversers)
-
- @gen.coroutine
- def submit_sideEffect_keys(self, request_id):
- message = self._get_sideEffect_keys_message(request_id)
- keys = yield self._execute_message(message)
- raise gen.Return(set(keys))
+ traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode))
+ side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
+ side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
+ return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(side_effect_keys, side_effect_value))
@gen.coroutine
- def submit_sideEffect_value(self, request_id, key):
- message = self._get_sideEffect_value_message(request_id, key)
- side_effects = yield self._execute_message(message)
- raise gen.Return(side_effects)
-
- @gen.coroutine
- def _execute_message(self, message):
- if self._ws.protocol is None:
- self._ws = yield websocket.websocket_connect(self.url)
- self._ws.write_message(message, binary=True)
- resp = Response(self._ws, self._username, self._password)
- results = None
- while True:
- msg = yield resp.receive()
- if msg is None:
- break
- # on first message, get the right result data structure
- if None == results:
- if "list" == msg[0]:
- results = []
- elif "set" == msg[0]:
- results = set()
- elif "map" == msg[0]:
- results = {}
- else:
- results = []
- # updating a map is different than a list or a set
- if isinstance(results, dict):
- for item in msg[1]:
- results.update(item)
- else:
- results += msg[1]
- raise gen.Return([] if None == results else results)
-
- def close(self):
- self._ws.close()
-
- def _get_traversal_bytecode_message(self, bytecode, request_id):
+ def submit_traversal_bytecode(self, request_id, bytecode):
message = {
"requestId": {
"@type": "g:UUID",
@@ -115,9 +67,11 @@ class DriverRemoteConnection(RemoteConnection):
"aliases": {"g": self.traversal_source}
}
}
- return DriverRemoteConnection._finalize_message(message)
+ traversers = yield self._execute_message(message)
+ raise gen.Return(traversers)
- def _get_sideEffect_keys_message(self, request_id):
+ @gen.coroutine
+ def submit_sideEffect_keys(self, request_id):
message = {
"requestId": {
"@type": "g:UUID",
@@ -132,9 +86,11 @@ class DriverRemoteConnection(RemoteConnection):
}
}
}
- return DriverRemoteConnection._finalize_message(message)
+ keys = yield self._execute_message(message)
+ raise gen.Return(set(keys))
- def _get_sideEffect_value_message(self, request_id, key):
+ @gen.coroutine
+ def submit_sideEffect_value(self, request_id, key):
message = {
"requestId": {
"@type": "g:UUID",
@@ -151,60 +107,95 @@ class DriverRemoteConnection(RemoteConnection):
"aliases": {"g": self.traversal_source}
}
}
- return DriverRemoteConnection._finalize_message(message)
+ value = yield self._execute_message(message)
+ raise gen.Return(value)
+
+ @gen.coroutine
+ def _execute_message(self, send_message):
+ send_message = b"".join([b"\x21",
+ b"application/vnd.gremlin-v2.0+json",
+ json.dumps(send_message).encode("utf-8")])
+ if self._websocket.protocol is None:
+ self._websocket = yield websocket.websocket_connect(self.url)
+ self._websocket.write_message(send_message, binary=True)
+ response = Response(self._websocket, self._username, self._password)
+ results = None
+ while True:
+ recv_message = yield response.receive()
+ if recv_message is None:
+ break
+
+ # on first message, get the right result data structure
+ if None == results:
+ if "list" == recv_message[0]:
+ results = []
+ elif "set" == recv_message[0]:
+ results = set()
+ elif "map" == recv_message[0]:
+ results = {}
+ elif "none" == recv_message[0]:
+ results = None
+ else:
+ results = []
+
+ # if there is no update to a structure, then the item is the result
+ if results is None:
+ results = recv_message[1][0]
+ # updating a map is different than a list or a set
+ elif isinstance(results, dict):
+ for item in recv_message[1]:
+ results.update(item)
+ # flat add list to result list
+ else:
+ results += recv_message[1]
+ raise gen.Return([] if None == results else results)
- @staticmethod
- def _finalize_message(message):
- message = json.dumps(message)
- mime_type = b"application/vnd.gremlin-v2.0+json"
- mime_len = b"\x21"
- return b"".join([mime_len, mime_type, message.encode("utf-8")])
+ def close(self):
+ self._websocket.close()
class Response:
- def __init__(self, ws, username, password):
- self._ws = ws
- self._closed = False
+ def __init__(self, websocket, username, password):
+ self._websocket = websocket
self._username = username
self._password = password
-
- def _authenticate(self, username, password):
- auth = b"".join([b"\x00", username.encode("utf-8"),
- b"\x00", password.encode("utf-8")])
- message = {
- "requestId": {
- "@type": "g:UUID",
- "@value": str(uuid.uuid4())
- },
- "op": "authentication",
- "processor": "traversal",
- "args": {
- "sasl": base64.b64encode(auth).decode()
- }
- }
- self._ws.send_message(DriverRemoteConnection._finalize_message(message), binary=True)
+ self._closed = False
@gen.coroutine
def receive(self):
if self._closed:
return
- data = yield self._ws.read_message()
- message = json.loads(data)
- status_code = message["status"]["code"]
- data = message["result"]["data"]
- msg = message["status"]["message"]
- meta = message["result"]["meta"]
- aggregateTo = "list" if "aggregateTo" not in meta else meta["aggregateTo"]
+ recv_message = yield self._websocket.read_message()
+ recv_message = json.loads(recv_message)
+ status_code = recv_message["status"]["code"]
+ aggregateTo = recv_message["result"]["meta"].get("aggregateTo", "list")
+ # authentification required then
if status_code == 407:
- self._authenticate(self._username, self._password)
- yield self.receive()
+ self._websocket.write_message(
+ b"".join([b"\x21",
+ b"application/vnd.gremlin-v2.0+json",
+ json.dumps({
+ "requestId": {
+ "@type": "g:UUID",
+ "@value": str(uuid.uuid4())
+ },
+ "op": "authentication",
+ "processor": "traversal",
+ "args": {
+ "sasl": base64.b64encode(
+ b"".join([b"\x00", self._username.encode("utf-8"),
+ b"\x00", self._password.encode("utf-8")])).decode()
+ }
+ }).encode("utf-8")]), binary=True)
+ results = yield self.receive()
+ raise gen.Return(results)
elif status_code == 204:
self._closed = True
return
elif status_code in [200, 206]:
results = []
- for item in data:
+ for item in recv_message["result"]["data"]:
results.append(GraphSONReader._objectify(item))
if status_code == 200:
self._closed = True
@@ -212,4 +203,4 @@ class Response:
else:
self._closed = True
raise GremlinServerError(
- "{0}: {1}".format(status_code, msg))
+ "{0}: {1}".format(status_code, recv_message["status"]["message"]))