You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by da...@apache.org on 2017/01/30 18:00:30 UTC
[3/7] tinkerpop git commit: fixed error handling for read task,
using read task as done future on resultset
fixed error handling for read task, using read task as done future on resultset
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/00786389
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/00786389
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/00786389
Branch: refs/heads/TINKERPOP-1599
Commit: 00786389303601b0da9dc2bbcc8a43f0497aebef
Parents: 2577a13
Author: davebshow <da...@gmail.com>
Authored: Sat Jan 28 12:16:47 2017 -0500
Committer: davebshow <da...@gmail.com>
Committed: Mon Jan 30 11:51:24 2017 -0500
----------------------------------------------------------------------
.../jython/gremlin_python/driver/connection.py | 3 ++-
.../driver/driver_remote_connection.py | 6 +++--
.../jython/gremlin_python/driver/protocol.py | 15 +++++++-----
.../gremlin_python/driver/remote_connection.py | 11 ++++-----
.../jython/gremlin_python/driver/resultset.py | 24 ++++++++++++++++----
5 files changed, 39 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py
index 2f59883..44ca8a3 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py
@@ -63,7 +63,8 @@ class Connection:
future.set_exception(e)
else:
# Start receive task
- self._executor.submit(self._receive)
+ done = self._executor.submit(self._receive)
+ result_set.done = done
future.set_result(result_set)
future_write.add_done_callback(cb)
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
index 4e70a87..104c1f7 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
@@ -20,10 +20,10 @@ from concurrent.futures import Future
from gremlin_python.driver import client
from gremlin_python.driver.remote_connection import (
- RemoteTraversal, RemoteTraversalSideEffects)
+ RemoteConnection, RemoteTraversal, RemoteTraversalSideEffects)
-class DriverRemoteConnection:
+class DriverRemoteConnection(RemoteConnection):
def __init__(self, url, traversal_source, protocol_factory=None,
transport_factory=None, pool_size=None, max_workers=None,
@@ -31,6 +31,8 @@ class DriverRemoteConnection:
self._client = client.Client(url, traversal_source, protocol_factory,
transport_factory, pool_size, max_workers,
None, username, password)
+ self._url = self._client._url
+ self._traversal_source = self._client._traversal_source
def close(self):
self._client.close()
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py
index 6ea17a5..df72bf7 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py
@@ -27,6 +27,10 @@ import six
from gremlin_python.driver import serializer, request
+class GremlinServerError(Exception):
+ pass
+
+
@six.add_metaclass(abc.ABCMeta)
class AbstractBaseProtocol:
@@ -66,7 +70,7 @@ class GremlinServerWSProtocol(AbstractBaseProtocol):
result_set = results_dict[request_id]
status_code = data['status']['code']
aggregate_to = data['result']['meta'].get('aggregateTo', 'list')
- result_set._aggregate_to = aggregate_to
+ result_set.aggregate_to = aggregate_to
if status_code == 407:
auth = b''.join([b'\x00', self._username.encode('utf-8'),
b'\x00', self._password.encode('utf-8')])
@@ -77,7 +81,7 @@ class GremlinServerWSProtocol(AbstractBaseProtocol):
data = self._transport.read()
self.data_received(data, results_dict)
elif status_code == 204:
- result_set.done.set_result(None)
+ result_set.stream.put_nowait([])
del results_dict[request_id]
elif status_code in [200, 206]:
results = []
@@ -89,10 +93,9 @@ class GremlinServerWSProtocol(AbstractBaseProtocol):
data = self._transport.read()
self.data_received(data, results_dict)
else:
- result_set.done.set_result(None)
+ # result_set.done.set_result(None)
del results_dict[request_id]
else:
- result_set.stream.put_nowait(GremlinServerError(
- "{0}: {1}".format(status_code, data["status"]["message"])))
- result_set.done.set_result(None)
del results_dict[request_id]
+ raise GremlinServerError(
+ "{0}: {1}".format(status_code, data["status"]["message"]))
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
index a95ea10..4d34e68 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
@@ -77,6 +77,7 @@ class RemoteTraversalSideEffects(traversal.TraversalSideEffects):
return self._keys
def get(self, key):
+
if not self._side_effects.get(key):
if not self._closed:
message = request.RequestMessage(
@@ -101,17 +102,15 @@ class RemoteTraversalSideEffects(traversal.TraversalSideEffects):
return results
def _aggregate_results(self, result_set):
- # Need to double check how all this works
- aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {}}
+ aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
+ 'none': None}
results = None
for msg in result_set:
if results is None:
- aggregate_to = result_set._aggregate_to
- if aggregate_to:
- results = aggregates.get(aggregate_to, [])
+ aggregate_to = result_set.aggregate_to
+ results = aggregates.get(aggregate_to, [])
# on first message, get the right result data structure
# if there is no update to a structure, then the item is the result
- # not really sure about this...
if results is None:
results = msg[0]
# updating a map is different than a list or a set
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py
index e12e0a0..01c1968 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py
@@ -24,10 +24,18 @@ class ResultSet:
def __init__(self, stream, request_id):
self._stream = stream
self._request_id = request_id
- self._done = Future()
+ self._done = None
self._aggregate_to = None
@property
+ def aggregate_to(self):
+ return self._aggregate_to
+
+ @aggregate_to.setter
+ def aggregate_to(self, val):
+ self._aggregate_to = val
+
+ @property
def request_id(self):
return self._request_id
@@ -51,11 +59,17 @@ class ResultSet:
def done(self):
return self._done
+ @done.setter
+ def done(self, future):
+ self._done = future
+
def one(self):
- if self.stream.empty() and self.done.done():
- return
- result = self.stream.get()
- return result
+ while not self.done.done():
+ if not self.stream.empty():
+ return self.stream.get_nowait()
+ if not self.stream.empty():
+ return self.stream.get_nowait()
+ return self.done.result()
def all(self):
future = Future()