You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/12/16 17:19:19 UTC

[07/14] tinkerpop git commit: implemented promise API for gremlin-python. side effect retrieval is problematic

implemented promise API for gremlin-python. side effect retrieval is problematic


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b0b93308
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b0b93308
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b0b93308

Branch: refs/heads/master
Commit: b0b933084b38f77fcd4843796980cabe313ed1e4
Parents: f02e183
Author: davebshow <da...@gmail.com>
Authored: Wed Dec 7 13:35:42 2016 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Dec 16 10:03:41 2016 -0500

----------------------------------------------------------------------
 .../python/TraversalSourceGenerator.groovy      | 27 +++++++++
 .../driver/driver_remote_connection.py          | 21 +++++--
 .../gremlin_python/driver/remote_connection.py  |  7 +++
 .../jython/gremlin_python/process/traversal.py  | 28 +++++++++-
 .../driver/test_driver_remote_connection.py     | 59 ++++++++++++++++++++
 5 files changed, 137 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
index 73ffcb6..fc76b71 100644
--- a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
+++ b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
@@ -114,6 +114,28 @@ class Traversal(object):
                 except StopIteration: return tempList
                 tempList.append(temp)
             return tempList
+    def promise(self, cb=None):
+        self.traversal_strategies.apply_async_strategies(self)
+        future_traversers = self.traversers
+        future = type(future_traversers)()
+        def process(f):
+            try:
+                traversers = f.result()
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                self.traversers = iter(traversers)
+                if cb:
+                    try:
+                        result = cb(self)
+                    except Exception as e:
+                        future.set_exception(e)
+                    else:
+                        future.set_result(result)
+                else:
+                    future.set_result(self)
+        future_traversers.add_done_callback(process)
+        return future
 
 
 """)
@@ -223,6 +245,9 @@ class TraversalStrategies(object):
     def apply_strategies(self, traversal):
         for traversal_strategy in self.traversal_strategies:
             traversal_strategy.apply(traversal)
+    def apply_async_strategies(self, traversal):
+        for traversal_strategy in self.traversal_strategies:
+            traversal_strategy.apply_async(traversal)
     def __repr__(self):
         return str(self.traversal_strategies)
 
@@ -233,6 +258,8 @@ class TraversalStrategy(object):
         self.configuration = {} if configuration is None else configuration
     def apply(self, traversal):
         return
+    def apply_async(self, traversal):
+        return
     def __eq__(self, other):
         return isinstance(other, self.__class__)
     def __hash__(self):

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
index d975f60..babb113 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
@@ -17,9 +17,11 @@ specific language governing permissions and limitations
 under the License.
 """
 import base64
+import functools
 import json
 import uuid
 from tornado import gen
+from tornado import concurrent
 from tornado import ioloop
 from tornado import websocket
 
@@ -51,10 +53,15 @@ class DriverRemoteConnection(RemoteConnection):
         '''
         request_id = str(uuid.uuid4())
         traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode))
-        side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
-        side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
-        side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id))
-        return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(side_effect_keys, side_effect_value, side_effect_close))
+        keys, value, close = self._get_side_effect_lambdas(request_id)
+        return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys, value, close))
+
+    def submit_async(self, bytecode):
+        request_id = str(uuid.uuid4())
+        future_traversers = self.submit_traversal_bytecode(request_id, bytecode)
+        keys, value, close = self._get_side_effect_lambdas(request_id)
+        side_effects = RemoteTraversalSideEffects(keys, value, close)
+        return RemoteTraversal(future_traversers, side_effects)
 
     @gen.coroutine
     def submit_traversal_bytecode(self, request_id, bytecode):
@@ -135,6 +142,12 @@ class DriverRemoteConnection(RemoteConnection):
         result = yield self._execute_message(message)
         raise gen.Return(result)
 
+    def _get_side_effect_lambdas(self, request_id):
+        side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
+        side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
+        side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id))
+        return side_effect_keys, side_effect_value, side_effect_close
+
     @gen.coroutine
     def _execute_message(self, send_message):
         send_message = b"".join([b"\x21",

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
index 46fb760..93c92b7 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
@@ -95,3 +95,10 @@ class RemoteStrategy(TraversalStrategy):
             remote_traversal = self.remote_connection.submit(traversal.bytecode)
             traversal.side_effects = remote_traversal.side_effects
             traversal.traversers = remote_traversal.traversers
+
+    def apply_async(self, traversal):
+        if traversal.traversers is None:
+            remote_traversal = self.remote_connection.submit_async(
+                traversal.bytecode)
+            traversal.side_effects = remote_traversal.side_effects
+            traversal.traversers = remote_traversal.traversers

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
index a7b6118..2c2db59 100644
--- a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
+++ b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
@@ -77,6 +77,28 @@ class Traversal(object):
                 except StopIteration: return tempList
                 tempList.append(temp)
             return tempList
+    def promise(self, cb=None):
+        self.traversal_strategies.apply_async_strategies(self)
+        future_traversers = self.traversers
+        future = type(future_traversers)()
+        def process(f):
+            try:
+                traversers = f.result()
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                self.traversers = iter(traversers)
+                if cb:
+                    try:
+                        result = cb(self)
+                    except Exception as e:
+                        future.set_exception(e)
+                    else:
+                        future.set_result(result)
+                else:
+                    future.set_result(self)
+        future_traversers.add_done_callback(process)
+        return future
 
 
 Barrier = Enum('Barrier', 'normSack')
@@ -286,6 +308,9 @@ class TraversalStrategies(object):
     def apply_strategies(self, traversal):
         for traversal_strategy in self.traversal_strategies:
             traversal_strategy.apply(traversal)
+    def apply_async_strategies(self, traversal):
+        for traversal_strategy in self.traversal_strategies:
+            traversal_strategy.apply_async(traversal)
     def __repr__(self):
         return str(self.traversal_strategies)
 
@@ -296,6 +321,8 @@ class TraversalStrategy(object):
         self.configuration = {} if configuration is None else configuration
     def apply(self, traversal):
         return
+    def apply_async(self, traversal):
+        return
     def __eq__(self, other):
         return isinstance(other, self.__class__)
     def __hash__(self):
@@ -379,4 +406,3 @@ class Binding(object):
         return hash(self.key) + hash(self.value)
     def __repr__(self):
         return "binding[" + self.key + "=" + str(self.value) + "]"
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
index 6b057d5..c9e64c5 100644
--- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
@@ -24,6 +24,8 @@ from unittest import TestCase
 
 import pytest
 
+from tornado import ioloop, gen
+
 from gremlin_python import statics
 from gremlin_python.statics import long
 from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
@@ -193,6 +195,63 @@ class TestDriverRemoteConnection(TestCase):
             t.side_effects.value_lambda('b')
         connection.close()
 
+    def test_promise(self):
+        loop = ioloop.IOLoop.current()
+        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+        g = Graph().traversal().withRemote(connection)
+
+        @gen.coroutine
+        def go():
+            future_traversal = g.V().promise(lambda x: x.toList())
+            assert not future_traversal.done()
+            resp = yield future_traversal
+            assert future_traversal.done()
+            assert len(resp) == 6
+            count = yield g.V().count().promise(lambda x: x.next())
+            assert count == 6
+
+        loop.run_sync(go)
+
+    def test_promise_side_effects(self):
+        loop = ioloop.IOLoop.current()
+        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+        g = Graph().traversal().withRemote(connection)
+
+        # Side effects are problematic in coroutines.
+        # Because they are designed to be synchronous (calling `run_sync`)
+        # they result in an error if called from a coroutine because
+        # the event loop is already running
+        @gen.coroutine
+        def go():
+            traversal = yield g.V().aggregate('a').promise()
+            # Trying to get side effect keys throws error - BAD
+            with pytest.raises(RuntimeError):
+                keys = traversal.side_effects.keys()
+                # IOLoop is now hosed.
+
+        loop.run_sync(go)
+
+        # Get a new IOLoop - this should happen for each test case.
+        connection.close()
+        ioloop.IOLoop.clear_instance()
+        loop.close()
+        loop = ioloop.IOLoop()
+        loop.make_current()
+
+        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+        g = Graph().traversal().withRemote(connection)
+
+        # If we return the traversal though, we can use side effects per usual.
+        @gen.coroutine
+        def go():
+            traversal = yield g.V().aggregate('a').promise()
+            raise gen.Return(traversal)  # Weird legacy Python compatible idiom
+
+        # See, normal side effects.
+        traversal = loop.run_sync(go)
+        a, = traversal.side_effects.keys()
+        assert  a == 'a'
+
 
 if __name__ == '__main__':
     test = False