You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by da...@apache.org on 2016/12/07 18:36:43 UTC
tinkerpop git commit: implemented promise API for gremlin-python.
side effect retrieval is problematic
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1490 57eb5ec62 -> 54fac5146
implemented promise API for gremlin-python. side effect retrieval is problematic
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/54fac514
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/54fac514
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/54fac514
Branch: refs/heads/TINKERPOP-1490
Commit: 54fac5146a5ac067e75f2fbfe135fae2da641c35
Parents: 57eb5ec
Author: davebshow <da...@gmail.com>
Authored: Wed Dec 7 13:35:42 2016 -0500
Committer: davebshow <da...@gmail.com>
Committed: Wed Dec 7 13:35:42 2016 -0500
----------------------------------------------------------------------
.../python/TraversalSourceGenerator.groovy | 27 +++++++++
.../driver/driver_remote_connection.py | 21 +++++--
.../gremlin_python/driver/remote_connection.py | 7 +++
.../jython/gremlin_python/process/traversal.py | 32 +++++++++--
.../driver/test_driver_remote_connection.py | 59 ++++++++++++++++++++
5 files changed, 137 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
index a3684cb..debd61d 100644
--- a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
+++ b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
@@ -114,6 +114,28 @@ class Traversal(object):
except StopIteration: return tempList
tempList.append(temp)
return tempList
+ def promise(self, cb=None):
+ self.traversal_strategies.apply_async_strategies(self)
+ future_traversers = self.traversers
+ future = type(future_traversers)()
+ def process(f):
+ try:
+ traversers = f.result()
+ except Exception as e:
+ future.set_exception(e)
+ else:
+ self.traversers = iter(traversers)
+ if cb:
+ try:
+ result = cb(self)
+ except Exception as e:
+ future.set_exception(e)
+ else:
+ future.set_result(result)
+ else:
+ future.set_result(self)
+ future_traversers.add_done_callback(process)
+ return future
""")
@@ -223,6 +245,9 @@ class TraversalStrategies(object):
def apply_strategies(self, traversal):
for traversal_strategy in self.traversal_strategies:
traversal_strategy.apply(traversal)
+ def apply_async_strategies(self, traversal):
+ for traversal_strategy in self.traversal_strategies:
+ traversal_strategy.apply_async(traversal)
def __repr__(self):
return str(self.traversal_strategies)
@@ -233,6 +258,8 @@ class TraversalStrategy(object):
self.configuration = {} if configuration is None else configuration
def apply(self, traversal):
return
+ def apply_async(self, traversal):
+ return
def __eq__(self, other):
return isinstance(other, self.__class__)
def __hash__(self):
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
index eab06f1..bcf6d15 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
@@ -17,9 +17,11 @@ specific language governing permissions and limitations
under the License.
"""
import base64
+import functools
import json
import uuid
from tornado import gen
+from tornado import concurrent
from tornado import ioloop
from tornado import websocket
@@ -51,10 +53,15 @@ class DriverRemoteConnection(RemoteConnection):
'''
request_id = str(uuid.uuid4())
traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode))
- side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
- side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
- side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id))
- return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(side_effect_keys, side_effect_value, side_effect_close))
+ keys, value, close = self._get_side_effect_lambdas(request_id)
+ return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys, value, close))
+
+ def submit_async(self, bytecode):
+ request_id = str(uuid.uuid4())
+ future_traversers = self.submit_traversal_bytecode(request_id, bytecode)
+ keys, value, close = self._get_side_effect_lambdas(request_id)
+ side_effects = RemoteTraversalSideEffects(keys, value, close)
+ return RemoteTraversal(future_traversers, side_effects)
@gen.coroutine
def submit_traversal_bytecode(self, request_id, bytecode):
@@ -135,6 +142,12 @@ class DriverRemoteConnection(RemoteConnection):
result = yield self._execute_message(message)
raise gen.Return(result)
+ def _get_side_effect_lambdas(self, request_id):
+ side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
+ side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
+ side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id))
+ return side_effect_keys, side_effect_value, side_effect_close
+
@gen.coroutine
def _execute_message(self, send_message):
send_message = b"".join([b"\x21",
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
index 46fb760..93c92b7 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
@@ -95,3 +95,10 @@ class RemoteStrategy(TraversalStrategy):
remote_traversal = self.remote_connection.submit(traversal.bytecode)
traversal.side_effects = remote_traversal.side_effects
traversal.traversers = remote_traversal.traversers
+
+ def apply_async(self, traversal):
+ if traversal.traversers is None:
+ remote_traversal = self.remote_connection.submit_async(
+ traversal.bytecode)
+ traversal.side_effects = remote_traversal.side_effects
+ traversal.traversers = remote_traversal.traversers
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
index d30db35..3fdd50c 100644
--- a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
+++ b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
@@ -77,6 +77,28 @@ class Traversal(object):
except StopIteration: return tempList
tempList.append(temp)
return tempList
+ def promise(self, cb=None):
+ self.traversal_strategies.apply_async_strategies(self)
+ future_traversers = self.traversers
+ future = type(future_traversers)()
+ def process(f):
+ try:
+ traversers = f.result()
+ except Exception as e:
+ future.set_exception(e)
+ else:
+ self.traversers = iter(traversers)
+ if cb:
+ try:
+ result = cb(self)
+ except Exception as e:
+ future.set_exception(e)
+ else:
+ future.set_result(result)
+ else:
+ future.set_result(self)
+ future_traversers.add_done_callback(process)
+ return future
Barrier = Enum('Barrier', 'normSack')
@@ -118,10 +140,6 @@ statics.add_static('keyDecr', Order.keyDecr)
statics.add_static('valueDecr', Order.valueDecr)
statics.add_static('shuffle', Order.shuffle)
-Pick = Enum('Pick', 'any none')
-statics.add_static('any', Pick.any)
-statics.add_static('none', Pick.none)
-
Pop = Enum('Pop', 'all_ first last')
statics.add_static('first', Pop.first)
statics.add_static('last', Pop.last)
@@ -286,6 +304,9 @@ class TraversalStrategies(object):
def apply_strategies(self, traversal):
for traversal_strategy in self.traversal_strategies:
traversal_strategy.apply(traversal)
+ def apply_async_strategies(self, traversal):
+ for traversal_strategy in self.traversal_strategies:
+ traversal_strategy.apply_async(traversal)
def __repr__(self):
return str(self.traversal_strategies)
@@ -296,6 +317,8 @@ class TraversalStrategy(object):
self.configuration = {} if configuration is None else configuration
def apply(self, traversal):
return
+ def apply_async(self, traversal):
+ return
def __eq__(self, other):
return isinstance(other, self.__class__)
def __hash__(self):
@@ -373,4 +396,3 @@ class Binding(object):
def __init__(self,key,value):
self.key = key
self.value = value
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
index 9f3e466..d825dad 100644
--- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
@@ -24,6 +24,8 @@ from unittest import TestCase
import pytest
+from tornado import ioloop, gen
+
from gremlin_python import statics
from gremlin_python.statics import long
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
@@ -188,6 +190,63 @@ class TestDriverRemoteConnection(TestCase):
t.side_effects.value_lambda('b')
connection.close()
+ def test_promise(self):
+ loop = ioloop.IOLoop.current()
+ connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+ g = Graph().traversal().withRemote(connection)
+
+ @gen.coroutine
+ def go():
+ future_traversal = g.V().promise(lambda x: x.toList())
+ assert not future_traversal.done()
+ resp = yield future_traversal
+ assert future_traversal.done()
+ assert len(resp) == 6
+ count = yield g.V().count().promise(lambda x: x.next())
+ assert count == 6
+
+ loop.run_sync(go)
+
+ def test_promise_side_effects(self):
+ loop = ioloop.IOLoop.current()
+ connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+ g = Graph().traversal().withRemote(connection)
+
+ # Side effects are problematic in coroutines.
+ # Because they are designed to be synchronous (calling `run_sync`)
+ # they result in an error if called from a coroutine because
+ # the event loop is already running
+ @gen.coroutine
+ def go():
+ traversal = yield g.V().aggregate('a').promise()
+ # Trying to get side effect keys throws error - BAD
+ with pytest.raises(RuntimeError):
+ keys = traversal.side_effects.keys()
+ # IOLoop is now hosed.
+
+ loop.run_sync(go)
+
+ # Get a new IOLoop - this should happen for each test case.
+ connection.close()
+ ioloop.IOLoop.clear_instance()
+ loop.close()
+ loop = ioloop.IOLoop()
+ loop.make_current()
+
+ connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+ g = Graph().traversal().withRemote(connection)
+
+ # If we return the traversal though, we can use side effects per usual.
+ @gen.coroutine
+ def go():
+ traversal = yield g.V().aggregate('a').promise()
+ raise gen.Return(traversal) # Weird legacy Python compatible idiom
+
+ # See, normal side effects.
+ traversal = loop.run_sync(go)
+ a, = traversal.side_effects.keys()
+ assert a == 'a'
+
if __name__ == '__main__':
test = False