You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by da...@apache.org on 2016/12/07 18:36:43 UTC

tinkerpop git commit: implemented promise API for gremlin-python. side effect retrieval is problematic

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1490 57eb5ec62 -> 54fac5146


implemented promise API for gremlin-python. side effect retrieval is problematic


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/54fac514
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/54fac514
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/54fac514

Branch: refs/heads/TINKERPOP-1490
Commit: 54fac5146a5ac067e75f2fbfe135fae2da641c35
Parents: 57eb5ec
Author: davebshow <da...@gmail.com>
Authored: Wed Dec 7 13:35:42 2016 -0500
Committer: davebshow <da...@gmail.com>
Committed: Wed Dec 7 13:35:42 2016 -0500

----------------------------------------------------------------------
 .../python/TraversalSourceGenerator.groovy      | 27 +++++++++
 .../driver/driver_remote_connection.py          | 21 +++++--
 .../gremlin_python/driver/remote_connection.py  |  7 +++
 .../jython/gremlin_python/process/traversal.py  | 32 +++++++++--
 .../driver/test_driver_remote_connection.py     | 59 ++++++++++++++++++++
 5 files changed, 137 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
index a3684cb..debd61d 100644
--- a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
+++ b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy
@@ -114,6 +114,28 @@ class Traversal(object):
                 except StopIteration: return tempList
                 tempList.append(temp)
             return tempList
+    def promise(self, cb=None):
+        self.traversal_strategies.apply_async_strategies(self)
+        future_traversers = self.traversers
+        future = type(future_traversers)()
+        def process(f):
+            try:
+                traversers = f.result()
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                self.traversers = iter(traversers)
+                if cb:
+                    try:
+                        result = cb(self)
+                    except Exception as e:
+                        future.set_exception(e)
+                    else:
+                        future.set_result(result)
+                else:
+                    future.set_result(self)
+        future_traversers.add_done_callback(process)
+        return future
 
 
 """)
@@ -223,6 +245,9 @@ class TraversalStrategies(object):
     def apply_strategies(self, traversal):
         for traversal_strategy in self.traversal_strategies:
             traversal_strategy.apply(traversal)
+    def apply_async_strategies(self, traversal):
+        for traversal_strategy in self.traversal_strategies:
+            traversal_strategy.apply_async(traversal)
     def __repr__(self):
         return str(self.traversal_strategies)
 
@@ -233,6 +258,8 @@ class TraversalStrategy(object):
         self.configuration = {} if configuration is None else configuration
     def apply(self, traversal):
         return
+    def apply_async(self, traversal):
+        return
     def __eq__(self, other):
         return isinstance(other, self.__class__)
     def __hash__(self):

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
index eab06f1..bcf6d15 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py
@@ -17,9 +17,11 @@ specific language governing permissions and limitations
 under the License.
 """
 import base64
+import functools
 import json
 import uuid
 from tornado import gen
+from tornado import concurrent
 from tornado import ioloop
 from tornado import websocket
 
@@ -51,10 +53,15 @@ class DriverRemoteConnection(RemoteConnection):
         '''
         request_id = str(uuid.uuid4())
         traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode))
-        side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
-        side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
-        side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id))
-        return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(side_effect_keys, side_effect_value, side_effect_close))
+        keys, value, close = self._get_side_effect_lambdas(request_id)
+        return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys, value, close))
+
+    def submit_async(self, bytecode):
+        request_id = str(uuid.uuid4())
+        future_traversers = self.submit_traversal_bytecode(request_id, bytecode)
+        keys, value, close = self._get_side_effect_lambdas(request_id)
+        side_effects = RemoteTraversalSideEffects(keys, value, close)
+        return RemoteTraversal(future_traversers, side_effects)
 
     @gen.coroutine
     def submit_traversal_bytecode(self, request_id, bytecode):
@@ -135,6 +142,12 @@ class DriverRemoteConnection(RemoteConnection):
         result = yield self._execute_message(message)
         raise gen.Return(result)
 
+    def _get_side_effect_lambdas(self, request_id):
+        side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id))
+        side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key))
+        side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id))
+        return side_effect_keys, side_effect_value, side_effect_close
+
     @gen.coroutine
     def _execute_message(self, send_message):
         send_message = b"".join([b"\x21",

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
index 46fb760..93c92b7 100644
--- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
+++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py
@@ -95,3 +95,10 @@ class RemoteStrategy(TraversalStrategy):
             remote_traversal = self.remote_connection.submit(traversal.bytecode)
             traversal.side_effects = remote_traversal.side_effects
             traversal.traversers = remote_traversal.traversers
+
+    def apply_async(self, traversal):
+        if traversal.traversers is None:
+            remote_traversal = self.remote_connection.submit_async(
+                traversal.bytecode)
+            traversal.side_effects = remote_traversal.side_effects
+            traversal.traversers = remote_traversal.traversers

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
index d30db35..3fdd50c 100644
--- a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
+++ b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py
@@ -77,6 +77,28 @@ class Traversal(object):
                 except StopIteration: return tempList
                 tempList.append(temp)
             return tempList
+    def promise(self, cb=None):
+        self.traversal_strategies.apply_async_strategies(self)
+        future_traversers = self.traversers
+        future = type(future_traversers)()
+        def process(f):
+            try:
+                traversers = f.result()
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                self.traversers = iter(traversers)
+                if cb:
+                    try:
+                        result = cb(self)
+                    except Exception as e:
+                        future.set_exception(e)
+                    else:
+                        future.set_result(result)
+                else:
+                    future.set_result(self)
+        future_traversers.add_done_callback(process)
+        return future
 
 
 Barrier = Enum('Barrier', 'normSack')
@@ -118,10 +140,6 @@ statics.add_static('keyDecr', Order.keyDecr)
 statics.add_static('valueDecr', Order.valueDecr)
 statics.add_static('shuffle', Order.shuffle)
 
-Pick = Enum('Pick', 'any none')
-statics.add_static('any', Pick.any)
-statics.add_static('none', Pick.none)
-
 Pop = Enum('Pop', 'all_ first last')
 statics.add_static('first', Pop.first)
 statics.add_static('last', Pop.last)
@@ -286,6 +304,9 @@ class TraversalStrategies(object):
     def apply_strategies(self, traversal):
         for traversal_strategy in self.traversal_strategies:
             traversal_strategy.apply(traversal)
+    def apply_async_strategies(self, traversal):
+        for traversal_strategy in self.traversal_strategies:
+            traversal_strategy.apply_async(traversal)
     def __repr__(self):
         return str(self.traversal_strategies)
 
@@ -296,6 +317,8 @@ class TraversalStrategy(object):
         self.configuration = {} if configuration is None else configuration
     def apply(self, traversal):
         return
+    def apply_async(self, traversal):
+        return
     def __eq__(self, other):
         return isinstance(other, self.__class__)
     def __hash__(self):
@@ -373,4 +396,3 @@ class Binding(object):
     def __init__(self,key,value):
         self.key = key
         self.value = value
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54fac514/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
index 9f3e466..d825dad 100644
--- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
+++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py
@@ -24,6 +24,8 @@ from unittest import TestCase
 
 import pytest
 
+from tornado import ioloop, gen
+
 from gremlin_python import statics
 from gremlin_python.statics import long
 from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
@@ -188,6 +190,63 @@ class TestDriverRemoteConnection(TestCase):
             t.side_effects.value_lambda('b')
         connection.close()
 
+    def test_promise(self):
+        loop = ioloop.IOLoop.current()
+        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+        g = Graph().traversal().withRemote(connection)
+
+        @gen.coroutine
+        def go():
+            future_traversal = g.V().promise(lambda x: x.toList())
+            assert not future_traversal.done()
+            resp = yield future_traversal
+            assert future_traversal.done()
+            assert len(resp) == 6
+            count = yield g.V().count().promise(lambda x: x.next())
+            assert count == 6
+
+        loop.run_sync(go)
+
+    def test_promise_side_effects(self):
+        loop = ioloop.IOLoop.current()
+        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+        g = Graph().traversal().withRemote(connection)
+
+        # Side effects are problematic in coroutines.
+        # Because they are designed to be synchronous (calling `run_sync`)
+        # they result in an error if called from a coroutine because
+        # the event loop is already running
+        @gen.coroutine
+        def go():
+            traversal = yield g.V().aggregate('a').promise()
+            # Trying to get side effect keys throws error - BAD
+            with pytest.raises(RuntimeError):
+                keys = traversal.side_effects.keys()
+                # IOLoop is now hosed.
+
+        loop.run_sync(go)
+
+        # Get a new IOLoop - this should happen for each test case.
+        connection.close()
+        ioloop.IOLoop.clear_instance()
+        loop.close()
+        loop = ioloop.IOLoop()
+        loop.make_current()
+
+        connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g')
+        g = Graph().traversal().withRemote(connection)
+
+        # If we return the traversal though, we can use side effects per usual.
+        @gen.coroutine
+        def go():
+            traversal = yield g.V().aggregate('a').promise()
+            raise gen.Return(traversal)  # Weird legacy Python compatible idiom
+
+        # See, normal side effects.
+        traversal = loop.run_sync(go)
+        a, = traversal.side_effects.keys()
+        assert  a == 'a'
+
 
 if __name__ == '__main__':
     test = False