You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/11/03 15:26:25 UTC

incubator-tinkerpop git commit: Optimized BLVP

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP3-904 [created] 8b8222da7


Optimized BLVP

BLVP now uses EventStrategy to monitor what the actual BulkLoader implementation does (e.g. whether it creates a new vertex or just returns an existing one).


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8b8222da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8b8222da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8b8222da

Branch: refs/heads/TINKERPOP3-904
Commit: 8b8222da7fed57fa1e7e0232ab788944716404f7
Parents: 19b7ae0
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Tue Nov 3 15:14:12 2015 +0100
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Tue Nov 3 15:14:12 2015 +0100

----------------------------------------------------------------------
 .../computer/bulkloading/BulkLoader.java        |  31 +++++
 .../bulkloading/BulkLoaderVertexProgram.java    | 116 ++++++++++++++++---
 .../bulkloading/IncrementalBulkLoader.java      |  14 ++-
 .../structure/TinkerGraphPlayTest.java          |  30 ++---
 4 files changed, 158 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
index dd5eaf4..2640d2e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
@@ -41,6 +41,22 @@ public interface BulkLoader {
     public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g);
 
     /**
+     * Creates a clone of the given edge between the given in- and out-vertices.
+     *
+     * @param edge      The edge to be cloned.
+     * @param outVertex The out-vertex in the given graph..
+     * @param inVertex  The in-vertex in the given graph.
+     * @param graph     The graph that holds the cloned edge after this method was called.
+     * @param g         A standard traversal source for the given graph.
+     * @return The cloned edge.
+     */
+    public default Edge createEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
+        final Edge result = outVertex.addEdge(edge.label(), inVertex);
+        edge.properties().forEachRemaining(property -> result.property(property.key(), property.value()));
+        return result;
+    }
+
+    /**
      * Gets or creates a clone of the given edge between the given in- and out-vertices.
      *
      * @param edge      The edge to be cloned.
@@ -53,6 +69,21 @@ public interface BulkLoader {
     public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g);
 
     /**
+     * Creates a clone of the given property for the given vertex.
+     *
+     * @param property The property to be cloned.
+     * @param vertex   The vertex in the given graph..
+     * @param graph    The graph that holds the given vertex.
+     * @param g        A standard traversal source for the given graph.
+     * @return The cloned property.
+     */
+    public default VertexProperty createVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
+        final VertexProperty result = vertex.property(property.key(), property.value());
+        property.properties().forEachRemaining(metaProperty -> result.property(metaProperty.key(), metaProperty.value()));
+        return result;
+    }
+
+    /**
      * Gets or creates a clone of the given property for the given vertex.
      *
      * @param property The property to be cloned.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 99bb1f5..2950bfd 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -31,8 +31,12 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.MutationListener;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
 import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
@@ -48,7 +52,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author Daniel Kuppitz (http://gremlin.guru)
@@ -74,12 +77,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     private GraphTraversalSource g;
     private long intermediateBatchSize;
 
-    private static final ThreadLocal<AtomicLong> counter = new ThreadLocal<AtomicLong>() {
-        @Override
-        protected AtomicLong initialValue() {
-            return new AtomicLong();
-        }
-    };
+    private BulkLoadingListener listener;
 
     private BulkLoaderVertexProgram() {
         messageScope = MessageScope.Local.of(__::inE);
@@ -116,17 +114,19 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
      * @param close Whether to close the current graph instance after calling commit() or not.
      */
     private void commit(final boolean close) {
-        if (!close && (intermediateBatchSize == 0L || counter.get().incrementAndGet() % intermediateBatchSize != 0))
+        if (!close && (intermediateBatchSize == 0L || listener.mutations() < intermediateBatchSize))
             return;
         if (null != graph) {
             if (graph.features().graph().supportsTransactions()) {
-                LOGGER.info("Committing transaction on Graph instance: {} [{}]", graph, counter.get().get());
+                LOGGER.info("Committing transaction on Graph instance: {} [{} mutations]", graph, listener.mutations());
                 try {
                     graph.tx().commit();
                     LOGGER.debug("Committed transaction on Graph instance: {}", graph);
+                    listener.resetCounter();
                 } catch (Exception e) {
                     LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
                     graph.tx().rollback();
+                    listener.resetCounter();
                     throw e;
                 }
             }
@@ -144,7 +144,6 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     @Override
     public void setup(final Memory memory) {
-        counter.get().set(0L);
     }
 
     @Override
@@ -172,7 +171,8 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
             graph = GraphFactory.open(configuration.subset(WRITE_GRAPH_CFG_KEY));
             LOGGER.info("Opened Graph instance: {}", graph);
             try {
-                g = graph.traversal();
+                listener = new BulkLoadingListener();
+                g = GraphTraversalSource.build().with(EventStrategy.build().addListener(listener).create()).create(graph);
             } catch (Exception e) {
                 try {
                     graph.close();
@@ -205,12 +205,15 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     private void executeInternal(final Vertex sourceVertex, final Messenger<Tuple> messenger, final Memory memory) {
         if (memory.isInitialIteration()) {
+            this.listener.resetStats();
             // get or create the vertex
             final Vertex targetVertex = bulkLoader.getOrCreateVertex(sourceVertex, graph, g);
             // write all the properties of the vertex to the newly created vertex
             final Iterator<VertexProperty<Object>> vpi = sourceVertex.properties();
-            while (vpi.hasNext()) {
-                bulkLoader.getOrCreateVertexProperty(vpi.next(), targetVertex, graph, g);
+            if (this.listener.isNewVertex()) {
+                vpi.forEachRemaining(vp -> bulkLoader.createVertexProperty(vp, targetVertex, graph, g));
+            } else {
+                vpi.forEachRemaining(vp -> bulkLoader.getOrCreateVertexProperty(vp, targetVertex, graph, g));
             }
             this.commit(false);
             if (!bulkLoader.useUserSuppliedIds()) {
@@ -221,9 +224,14 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
         } else if (memory.getIteration() == 1) {
             if (bulkLoader.useUserSuppliedIds()) {
                 final Vertex outV = bulkLoader.getVertex(sourceVertex, graph, g);
+                final boolean incremental = outV.edges(Direction.OUT).hasNext();
                 sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
                     final Vertex inV = bulkLoader.getVertex(edge.inVertex(), graph, g);
-                    bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+                    if (incremental) {
+                        bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+                    } else {
+                        bulkLoader.createEdge(edge, outV, inV, graph, g);
+                    }
                     this.commit(false);
                 });
             } else {
@@ -407,4 +415,84 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
             }
         };
     }
+
+    static class BulkLoadingListener implements MutationListener {
+
+        private long counter;
+        private boolean isNewVertex;
+
+        public BulkLoadingListener() {
+            this.counter = 0L;
+            this.isNewVertex = false;
+            ;
+        }
+
+        public boolean isNewVertex() {
+            return this.isNewVertex;
+        }
+
+        public long mutations() {
+            return this.counter;
+        }
+
+        public void resetStats() {
+            this.isNewVertex = false;
+        }
+
+        public void resetCounter() {
+            this.counter = 0L;
+        }
+
+        @Override
+        public void vertexAdded(final Vertex vertex) {
+            this.isNewVertex = true;
+            this.counter++;
+        }
+
+        @Override
+        public void vertexRemoved(final Vertex vertex) {
+            this.counter++;
+        }
+
+        @Override
+        public void vertexPropertyChanged(final Vertex element, final Property oldValue, final Object setValue,
+                                          final Object... vertexPropertyKeyValues) {
+            this.counter++;
+        }
+
+        @Override
+        public void vertexPropertyRemoved(final VertexProperty vertexProperty) {
+            this.counter++;
+        }
+
+        @Override
+        public void edgeAdded(final Edge edge) {
+            this.counter++;
+        }
+
+        @Override
+        public void edgeRemoved(final Edge edge) {
+            this.counter++;
+        }
+
+        @Override
+        public void edgePropertyChanged(final Edge element, final Property oldValue, final Object setValue) {
+            this.counter++;
+        }
+
+        @Override
+        public void edgePropertyRemoved(final Edge element, final Property property) {
+            this.counter++;
+        }
+
+        @Override
+        public void vertexPropertyPropertyChanged(final VertexProperty element, final Property oldValue, final Object setValue) {
+            this.counter++;
+        }
+
+        @Override
+        public void vertexPropertyPropertyRemoved(final VertexProperty element, final Property property) {
+            this.counter++;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
index f03cd18..8219515 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
@@ -51,8 +51,10 @@ public class IncrementalBulkLoader implements BulkLoader {
         return iterator.hasNext()
                 ? iterator.next()
                 : useUserSuppliedIds()
-                ? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
-                : graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
+                ? g.addV(vertex.label()).property(T.id, vertex.id()).next()
+                : g.addV(vertex.label()).property(getVertexIdProperty(), vertex.id()).next();
+        //? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
+        //: graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
     }
 
     /**
@@ -71,8 +73,7 @@ public class IncrementalBulkLoader implements BulkLoader {
                 }
             });
         } else {
-            e = outVertex.addEdge(edge.label(), inVertex);
-            edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+            e = createEdge(edge, outVertex, inVertex, graph, g);
         }
         return e;
     }
@@ -84,7 +85,10 @@ public class IncrementalBulkLoader implements BulkLoader {
     public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
         final VertexProperty<?> vp;
         final VertexProperty<?> existing = vertex.property(property.key());
-        if (!existing.isPresent() || !existing.value().equals(property.value())) {
+        if (!existing.isPresent()) {
+            return createVertexProperty(property, vertex, graph, g);
+        }
+        if (!existing.value().equals(property.value())) {
             vp = vertex.property(property.key(), property.value());
         } else {
             vp = existing;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
index 06994e2..e7c54ce 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.tinkergraph.structure;
 
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
@@ -30,10 +31,12 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.graphml.GraphMLIo;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Supplier;
@@ -192,20 +195,19 @@ public class TinkerGraphPlayTest {
     @Test
     @Ignore
     public void testPlayDK() throws Exception {
-        final Graph graph = TinkerFactory.createModern();
-        final GraphTraversalSource g = graph.traversal();
-        Traversal traversal = g.V().where(out().and().in()).profile().cap(TraversalMetrics.METRICS_KEY);
-        //traversal.forEachRemaining(System.out::println);
-        System.out.println(traversal.toString());
-        traversal.asAdmin().applyStrategies();
-        System.out.println(traversal.toString());
-        traversal.forEachRemaining(System.out::println);
-        traversal = g.V().where(and(out(), in())).profile().cap(TraversalMetrics.METRICS_KEY);
-        //traversal.forEachRemaining(System.out::println);
-        System.out.println(traversal.toString());
-        traversal.asAdmin().applyStrategies();
-        System.out.println(traversal.toString());
-        //System.out.println(traversal.toString());
+
+        new File("/tmp/tinkergraph2.kryo").deleteOnExit();
+        new File("/tmp/tinkergraph3.kryo").deleteOnExit();
+
+        final Graph graph1 = TinkerFactory.createModern();
+        final Graph graph2 = GraphFactory.open("/tmp/graph2.properties");
+        TinkerFactory.generateModern((TinkerGraph) graph2);
+        graph2.close();
+
+        System.out.println("graph1 -> graph2");
+        graph1.compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph("/tmp/graph2.properties").create(graph1)).submit().get();
+        System.out.println("graph1 -> graph3");
+        graph1.compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph("/tmp/graph3.properties").create(graph1)).submit().get();
     }
 
     @Test