You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2018/02/23 12:01:07 UTC

[1/6] tinkerpop git commit: Apply edgeFunction in SparkMessenger

Repository: tinkerpop
Updated Branches:
  refs/heads/master 016111e5d -> d683206a7


Apply edgeFunction in SparkMessenger


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/744c4ecb
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/744c4ecb
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/744c4ecb

Branch: refs/heads/master
Commit: 744c4ecbc042acdc481045d6df5077d52d370ac9
Parents: 92a09d8
Author: zhuchenchen <zh...@didichuxing.com>
Authored: Mon Jan 22 11:15:41 2018 +0800
Committer: zhuchenchen <zh...@didichuxing.com>
Committed: Mon Jan 22 11:15:41 2018 +0800

----------------------------------------------------------------------
 .../spark/process/computer/SparkMessenger.java  |  2 +-
 .../process/computer/SparkMessengerTest.java    | 86 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
index aab7ecd..53a755c 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
@@ -63,7 +63,7 @@ public final class SparkMessenger<M> implements Messenger<M> {
             final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
             final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex);
             final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
-            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
+            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), localMessageScope.getEdgeFunction().apply(message, edge))));
         } else {
             ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java
new file mode 100644
index 0000000..c280ab2
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * @author Dean Zhu
+ */
+public class SparkMessengerTest extends AbstractSparkTest {
+    private static ObjectMapper objectmapper = new ObjectMapper();
+
+    @Test
+    public void testSparkMessenger() throws Exception {
+        // Define scopes
+        final MessageScope.Local<String> orderSrcMessageScope = MessageScope.Local
+                .of(__::inE, new BiFunction<String, Edge, String>() {
+                    @Override
+                    public String apply(String message, Edge edge) {
+                        System.out.println(edge);
+                        if ("mocked_edge_label1".equals(edge.label())) {
+                            return message;
+                        }
+                        return null;
+                    }
+                });
+        final MessageScope.Local<String> inMessageScope = MessageScope.Local.of(__::inE);
+
+        // Define star graph
+        final StarGraph starGraph = StarGraph.open();
+        Object[] vertex0Array = new Object[]{T.id, 0, T.label, "mocked_vertex_label1"};
+        Object[] vertex1Array = new Object[]{T.id, 1, T.label, "mocked_vertex_label2"};
+        Object[] vertex2Array = new Object[]{T.id, 2, T.label, "mocked_vertex_label2"};
+        Vertex vertex0 = starGraph.addVertex(vertex0Array);
+        Vertex vertex1 = starGraph.addVertex(vertex1Array);
+        Vertex vertex2 = starGraph.addVertex(vertex2Array);
+        vertex1.addEdge("mocked_edge_label1", vertex0);
+        vertex2.addEdge("mocked_edge_label2", vertex0);
+
+        // Create Spark Messenger
+        final SparkMessenger<String> messenger = new SparkMessenger<>();
+        final List<String> incomingMessages = Arrays.asList("a", "b", "c");
+        messenger.setVertexAndIncomingMessages(vertex0, incomingMessages);
+
+        messenger.sendMessage(orderSrcMessageScope, "a");
+        List<Tuple2<Object, String>> outgoingMessages0 = messenger.getOutgoingMessages();
+        System.out.println(objectmapper.writeValueAsString(outgoingMessages0));
+
+        Assert.assertEquals("a", outgoingMessages0.get(0)._2());
+        Assert.assertNull(outgoingMessages0.get(1)._2());
+        //messenger.sendMessage(inMessageScope, "a");
+        //List<Tuple2<Object, String>> outgoingMessages1 = messenger.getOutgoingMessages();
+        //System.out.println(objectmapper.writeValueAsString(outgoingMessages1));
+    }
+}
\ No newline at end of file


[2/6] tinkerpop git commit: Merge branch 'pr-788' into TINKERPOP-1872

Posted by sp...@apache.org.
Merge branch 'pr-788' into TINKERPOP-1872


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/4b34bb26
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/4b34bb26
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/4b34bb26

Branch: refs/heads/master
Commit: 4b34bb26a1ff381d33c4be156900d3c5e7a27279
Parents: 0bcab7f 744c4ec
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Jan 30 09:26:57 2018 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Jan 30 09:26:57 2018 -0500

----------------------------------------------------------------------
 .../spark/process/computer/SparkMessenger.java  |  2 +-
 .../process/computer/SparkMessengerTest.java    | 86 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[4/6] tinkerpop git commit: TINKERPOP-1872 Updated changelog

Posted by sp...@apache.org.
TINKERPOP-1872 Updated changelog


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0f0d97a2
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0f0d97a2
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0f0d97a2

Branch: refs/heads/master
Commit: 0f0d97a26b7cfe7fa7546362fdac2a7ee83fb8f8
Parents: 6456cbe
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Jan 30 09:28:59 2018 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Jan 30 09:28:59 2018 -0500

----------------------------------------------------------------------
 CHANGELOG.asciidoc | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0f0d97a2/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 7377d24..4d93912 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Removed hardcoded expectation in metrics serialization test suite as different providers may have different outputs.
 * Added `IndexedTraverserSet` which indexes on the value of a `Traverser` thus improving performance when used.
 * Utilized `IndexedTraverserSet` in `TraversalVertexProgram` to avoid extra iteration when doing `Vertex` lookups.
+* Fixed bug where `SparkMessenger` was not applying the `edgeFunction` from `MessageScope`.
 * Fixed a bug in `ComputerAwareStep` that didn't handle `reset()` properly and thus occasionally produced some extra traversers.
 
 [[release-3-2-7]]


[6/6] tinkerpop git commit: Merge branch 'tp32'

Posted by sp...@apache.org.
Merge branch 'tp32'


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d683206a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d683206a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d683206a

Branch: refs/heads/master
Commit: d683206a76208bfcff945eb54f2818be9c66b419
Parents: 016111e 3df6c58
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Feb 23 07:00:50 2018 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Feb 23 07:00:50 2018 -0500

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../process/computer/GraphComputerTest.java     | 70 ++++++++++++++++
 .../spark/process/computer/SparkMessenger.java  |  2 +-
 .../process/computer/SparkMessengerTest.java    | 86 ++++++++++++++++++++
 4 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d683206a/CHANGELOG.asciidoc
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d683206a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------


[3/6] tinkerpop git commit: TINKERPOP-1872 Test for edgeFunction on Messengers

Posted by sp...@apache.org.
TINKERPOP-1872 Test for edgeFunction on Messengers


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/6456cbe6
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/6456cbe6
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/6456cbe6

Branch: refs/heads/master
Commit: 6456cbe6eed1818f5c60172c37037f65061bf2c2
Parents: 4b34bb2
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Jan 30 09:27:06 2018 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Jan 30 09:27:06 2018 -0500

----------------------------------------------------------------------
 .../process/computer/GraphComputerTest.java     | 70 ++++++++++++++++++++
 1 file changed, 70 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6456cbe6/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 380887b..8c846d5 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -1656,6 +1656,76 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
     /////////////////////////////////////////////
 
+    /////////////////////////////////////////////
+    @Test
+    @LoadGraphWith(MODERN)
+    public void shouldSupportMultipleScopesWithEdgeFunction() throws ExecutionException, InterruptedException {
+        final ComputerResult result = graphProvider.getGraphComputer(graph).program(new MultiScopeVertexWithEdgeFunctionProgram()).submit().get();
+        assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L);
+        assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 4L);
+        assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 10L);
+        assertEquals(result.graph().traversal().V().has("name", "marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 20L);
+    }
+
+    public static class MultiScopeVertexWithEdgeFunctionProgram extends StaticVertexProgram<Long> {
+
+        private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE, (m,e) -> m * Math.round(((Double) e.values("weight").next()) * 10));
+        private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE, (m,e) -> m * Math.round(((Double) e.values("weight").next()) * 10));
+
+        private static final String MEMORY_KEY = "count";
+
+
+        @Override
+        public void setup(final Memory memory) {
+        }
+
+        @Override
+        public GraphComputer.Persist getPreferredPersist() {
+            return GraphComputer.Persist.VERTEX_PROPERTIES;
+        }
+
+        @Override
+        public Set<VertexComputeKey> getVertexComputeKeys() {
+            return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, false));
+        }
+
+        @Override
+        public Set<MessageScope> getMessageScopes(final Memory memory) {
+            HashSet<MessageScope> scopes = new HashSet<>();
+            scopes.add(countMessageScopeIn);
+            scopes.add(countMessageScopeOut);
+            return scopes;
+        }
+
+        @Override
+        public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
+            switch (memory.getIteration()) {
+                case 0:
+                    if (vertex.value("name").equals("josh")) {
+                        messenger.sendMessage(this.countMessageScopeIn, 2L);
+                        messenger.sendMessage(this.countMessageScopeOut, 1L);
+                    }
+                    break;
+                case 1:
+                    long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
+                    vertex.property(MEMORY_KEY, edgeCount);
+                    break;
+            }
+        }
+
+        @Override
+        public boolean terminate(final Memory memory) {
+            return memory.getIteration() == 1;
+        }
+
+        @Override
+        public GraphComputer.ResultGraph getPreferredResultGraph() {
+            return GraphComputer.ResultGraph.NEW;
+        }
+    }
+
+    /////////////////////////////////////////////
+
     @Test
     @LoadGraphWith(MODERN)
     public void shouldSupportGraphFilter() throws Exception {


[5/6] tinkerpop git commit: Merge branch 'TINKERPOP-1872' into tp32

Posted by sp...@apache.org.
Merge branch 'TINKERPOP-1872' into tp32

Conflicts:
	CHANGELOG.asciidoc


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3df6c580
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3df6c580
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3df6c580

Branch: refs/heads/master
Commit: 3df6c5806a585e8e1c381e4043338110ef8b4517
Parents: d42d54d 0f0d97a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Feb 23 07:00:39 2018 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Feb 23 07:00:39 2018 -0500

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../process/computer/GraphComputerTest.java     | 70 ++++++++++++++++
 .../spark/process/computer/SparkMessenger.java  |  2 +-
 .../process/computer/SparkMessengerTest.java    | 86 ++++++++++++++++++++
 4 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3df6c580/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --cc CHANGELOG.asciidoc
index f1519b6,4d93912..1ed018c
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@@ -31,7 -27,7 +31,8 @@@ image::https://raw.githubusercontent.co
  * Removed hardcoded expectation in metrics serialization test suite as different providers may have different outputs.
  * Added `IndexedTraverserSet` which indexes on the value of a `Traverser` thus improving performance when used.
  * Utilized `IndexedTraverserSet` in `TraversalVertexProgram` to avoid extra iteration when doing `Vertex` lookups.
 +* Fixed a bug in Gremlin Console which prevented handling of `gremlin.sh` flags that had an "=" between the flag and its arguments.
+ * Fixed bug where `SparkMessenger` was not applying the `edgeFunction` from `MessageScope`.
  * Fixed a bug in `ComputerAwareStep` that didn't handle `reset()` properly and thus occasionally produced some extra traversers.
  
  [[release-3-2-7]]