You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/08/30 14:48:56 UTC

[1/3] incubator-tinkerpop git commit: Big cleanup after testing failure scenarios.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/blvp 4978e331b -> 9ec8c96da


Big cleanup after testing failure scenarios.

Got rid of DefaultBulkLoader. Renamed IncrementalBulkLoader to DefaultBulkLoader.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9f467c4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9f467c4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9f467c4c

Branch: refs/heads/blvp
Commit: 9f467c4cf8c78d28bbf914e9465c045be3b558ce
Parents: 4978e33
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Sat Aug 29 03:17:20 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Sat Aug 29 03:17:20 2015 +0200

----------------------------------------------------------------------
 .../computer/bulkloading/BulkLoader.java        |   6 +-
 .../bulkloading/BulkLoaderVertexProgram.java    |  39 ++++---
 .../computer/bulkloading/DefaultBulkLoader.java |  70 ++++++++----
 .../bulkloading/IncrementalBulkLoader.java      | 112 -------------------
 4 files changed, 76 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
index 5619971..dd5eaf4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
@@ -95,11 +95,9 @@ public interface BulkLoader {
     public boolean useUserSuppliedIds();
 
     /**
-     * @return Whether original vertex identifiers are stored in the target graph or not.
+     * @return Whether to keep the original vertex identifiers in the target graph or not.
      */
-    public default boolean storeOriginalIds() {
-        return !useUserSuppliedIds();
-    }
+    public boolean keepOriginalIds();
 
     /**
      * @return The name of the vertex property that is used to store the original vertex id.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 2f36a6e..b8219fb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -54,10 +54,10 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BulkLoaderVertexProgram.class);
 
-    public static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.bulkLoading";
-    private static final String GRAPH_CFG_KEY = "graph";
-    private static final String BULK_LOADER_CFG_KEY = "loader";
-    private static final String INTERMEDIATE_BATCH_SIZE_CFG_KEY = "intermediateBatchSize";
+    public static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.bulkLoaderVertexProgram";
+    public static final String GRAPH_CFG_KEY = "graph";
+    public static final String BULK_LOADER_CFG_KEY = "loader";
+    public static final String INTERMEDIATE_BATCH_SIZE_CFG_KEY = "intermediateBatchSize";
     public static final String BULK_LOADER_VERTEX_ID_CFG_KEY = "vertexIdProperty";
     public static final String DEFAULT_BULK_LOADER_VERTEX_ID = "bulkLoader.vertex.id";
 
@@ -117,9 +117,8 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
             if (graph.features().graph().supportsTransactions()) {
                 LOGGER.info("Committing transaction on Graph instance: {}", graph);
                 try {
-                    graph.tx().commit(); // TODO will Giraph/MR restart the program and re-run execute if this fails?
+                    graph.tx().commit();
                     LOGGER.debug("Committed transaction on Graph instance: {}", graph);
-                    counter = 0L;
                 } catch (Exception e) {
                     LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
                     graph.tx().rollback();
@@ -140,7 +139,6 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     @Override
     public void setup(final Memory memory) {
-
     }
 
     @Override
@@ -156,6 +154,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
         }
         intermediateBatchSize = configuration.getLong(INTERMEDIATE_BATCH_SIZE_CFG_KEY, 0L);
         elementComputeKeys.add(configuration.subset(BULK_LOADER_CFG_KEY).getString(BULK_LOADER_VERTEX_ID_CFG_KEY));
+        bulkLoader = createBulkLoader();
     }
 
     @Override
@@ -176,7 +175,6 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                     throw new IllegalStateException("The given graph instance does not allow concurrent access.");
                 }
                 g = graph.traversal();
-                bulkLoader = createBulkLoader();
             } catch (Exception e) {
                 try {
                     graph.close();
@@ -222,12 +220,13 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                 sourceVertex.property(bulkLoader.getVertexIdProperty(), targetVertex.id());
                 messenger.sendMessage(messageScope, Pair.with(sourceVertex.id(), targetVertex.id()));
             }
-        } else {
+        } else if (memory.getIteration() == 1) {
             if (bulkLoader.useUserSuppliedIds()) {
                 final Vertex outV = bulkLoader.getVertex(sourceVertex, graph, g);
                 sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
                     final Vertex inV = bulkLoader.getVertex(edge.inVertex(), graph, g);
                     bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+                    this.commit(false);
                 });
             } else {
                 // create an id map and populate it with all the incoming messages
@@ -237,7 +236,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                     final Tuple idPair = idi.next();
                     idPairs.put(idPair.getValue(0), idPair.getValue(1));
                 }
-                // get the vertex given the dummy id property
+                // get the vertex with given the dummy id property
                 final Long outVId = sourceVertex.value(bulkLoader.getVertexIdProperty());
                 final Vertex outV = bulkLoader.getVertexById(outVId, graph, g);
                 // for all the incoming edges of the vertex, get the incoming adjacent vertex and write the edge and its properties
@@ -248,12 +247,23 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                     this.commit(false);
                 });
             }
+        } else if (memory.getIteration() == 2) {
+            final Long vertexId = sourceVertex.value(bulkLoader.getVertexIdProperty());
+            bulkLoader.getVertexById(vertexId, graph, g)
+                    .property(bulkLoader.getVertexIdProperty()).remove();
+            this.commit(false);
         }
     }
 
     @Override
     public boolean terminate(final Memory memory) {
-        return memory.getIteration() >= 1;
+        switch (memory.getIteration()) {
+            case 1:
+                return bulkLoader.keepOriginalIds();
+            case 2:
+                return true;
+        }
+        return false;
     }
 
     @Override
@@ -266,6 +276,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
         return Collections.singleton(messageScope);
     }
 
+    @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "CloneDoesntCallSuperClone"})
     @Override
     public VertexProgram<Tuple> clone() {
         return this;
@@ -273,17 +284,17 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     @Override
     public GraphComputer.ResultGraph getPreferredResultGraph() {
-        return GraphComputer.ResultGraph.NEW;
+        return GraphComputer.ResultGraph.ORIGINAL;
     }
 
     @Override
     public GraphComputer.Persist getPreferredPersist() {
-        return GraphComputer.Persist.EDGES;
+        return GraphComputer.Persist.NOTHING;
     }
 
     @Override
     public String toString() {
-        return StringFactory.vertexProgramString(this, "");
+        return StringFactory.vertexProgramString(this, bulkLoader != null ? bulkLoader.getClass().getSimpleName() : null);
     }
 
     public static Builder build() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
index 029d379..75d3494 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
@@ -19,38 +19,43 @@
 package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 
+import java.util.Iterator;
+
 /**
  * @author Daniel Kuppitz (http://gremlin.guru)
  */
 public class DefaultBulkLoader implements BulkLoader {
 
     public final static String USER_SUPPLIED_IDS_CFG_KEY = "userSuppliedIds";
-    public final static String STORE_ORIGINAL_IDS_CFG_KEY = "storeOriginalIds";
+    public final static String KEEP_ORIGINAL_IDS_CFG_KEY = "keepOriginalIds";
 
     private String bulkLoaderVertexId = BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
-    private boolean storeOriginalIds = false;
-    private boolean useUserSuppliedIds = false;
+    private boolean keepOriginalIds = true;
+    private boolean userSuppliedIds = false;
 
     /**
      * {@inheritDoc}
      */
     @Override
     public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
-        if (useUserSuppliedIds()) {
-            return graph.addVertex(T.id, vertex.id(), T.label, vertex.label());
-        }
-        final Vertex v = graph.addVertex(T.label, vertex.label());
-        if (storeOriginalIds()) {
-            v.property(bulkLoaderVertexId, vertex.id());
-        }
-        return v;
+        final Iterator<Vertex> iterator = useUserSuppliedIds()
+                ? graph.vertices(vertex.id())
+                : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
+        return iterator.hasNext()
+                ? iterator.next()
+                : useUserSuppliedIds()
+                ? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
+                : graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
     }
 
     /**
@@ -58,8 +63,20 @@ public class DefaultBulkLoader implements BulkLoader {
      */
     @Override
     public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
-        final Edge e = outVertex.addEdge(edge.label(), inVertex);
-        edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+        final Edge e;
+        final Traversal<Vertex, Edge> t = g.V(outVertex).outE(edge.label()).filter(__.inV().is(inVertex));
+        if (t.hasNext()) {
+            e = t.next();
+            edge.properties().forEachRemaining(property -> {
+                final Property<?> existing = e.property(property.key());
+                if (!existing.isPresent() || !existing.value().equals(property.value())) {
+                    e.property(property.key(), property.value());
+                }
+            });
+        } else {
+            e = outVertex.addEdge(edge.label(), inVertex);
+            edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+        }
         return e;
     }
 
@@ -68,8 +85,19 @@ public class DefaultBulkLoader implements BulkLoader {
      */
     @Override
     public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
-        final VertexProperty<?> vp = vertex.property(property.key(), property.value());
-        vp.properties().forEachRemaining(metaProperty -> property.property(metaProperty.key(), metaProperty.value()));
+        final VertexProperty<?> vp;
+        final VertexProperty<?> existing = vertex.property(property.key());
+        if (!existing.isPresent() || !existing.value().equals(property.value())) {
+            vp = vertex.property(property.key(), property.value());
+        } else {
+            vp = existing;
+        }
+        property.properties().forEachRemaining(metaProperty -> {
+            final Property<?> existing2 = vp.property(metaProperty.key());
+            if (!existing2.isPresent() || !existing2.value().equals(metaProperty.value())) {
+                vp.property(metaProperty.key(), metaProperty.value());
+            }
+        });
         return vp;
     }
 
@@ -88,15 +116,15 @@ public class DefaultBulkLoader implements BulkLoader {
      */
     @Override
     public boolean useUserSuppliedIds() {
-        return useUserSuppliedIds;
+        return userSuppliedIds;
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public boolean storeOriginalIds() {
-        return storeOriginalIds;
+    public boolean keepOriginalIds() {
+        return keepOriginalIds;
     }
 
     /**
@@ -116,10 +144,10 @@ public class DefaultBulkLoader implements BulkLoader {
             bulkLoaderVertexId = configuration.getString(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY);
         }
         if (configuration.containsKey(USER_SUPPLIED_IDS_CFG_KEY)) {
-            useUserSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
+            userSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
         }
-        if (configuration.containsKey(STORE_ORIGINAL_IDS_CFG_KEY)) {
-            storeOriginalIds = configuration.getBoolean(STORE_ORIGINAL_IDS_CFG_KEY);
+        if (configuration.containsKey(KEEP_ORIGINAL_IDS_CFG_KEY)) {
+            keepOriginalIds = configuration.getBoolean(KEEP_ORIGINAL_IDS_CFG_KEY);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
deleted file mode 100644
index 7822c2d..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-
-/**
- * @author Daniel Kuppitz (http://gremlin.guru)
- */
-public class IncrementalBulkLoader extends DefaultBulkLoader {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalBulkLoader.class);
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
-        final Iterator<Vertex> iterator = useUserSuppliedIds()
-                ? graph.vertices(vertex.id())
-                : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
-        return iterator.hasNext() ? iterator.next() : super.getOrCreateVertex(vertex, graph, g);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
-        final Traversal<Vertex, Edge> t = g.V(outVertex).outE(edge.label()).filter(__.inV().is(inVertex));
-        if (t.hasNext()) {
-            final Edge e = t.next();
-            edge.properties().forEachRemaining(property -> {
-                final Property<?> existing = e.property(property.key());
-                if (!existing.isPresent() || !existing.value().equals(property.value())) {
-                    e.property(property.key(), property.value());
-                }
-            });
-            return e;
-        }
-        return super.getOrCreateEdge(edge, outVertex, inVertex, graph, g);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
-        final VertexProperty<?> vp;
-        final VertexProperty<?> existing = vertex.property(property.key());
-        if (!existing.isPresent() || !existing.value().equals(property.value())) {
-            vp = vertex.property(property.key(), property.value());
-        } else {
-            vp = existing;
-        }
-        property.properties().forEachRemaining(metaProperty -> {
-            final Property<?> existing2 = vp.property(metaProperty.key());
-            if (!existing2.isPresent() || !existing2.value().equals(metaProperty.value())) {
-                vp.property(metaProperty.key(), metaProperty.value());
-            }
-        });
-        return vp;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean storeOriginalIds() {
-        return !useUserSuppliedIds();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void configure(final Configuration configuration) {
-        super.configure(configuration);
-        if (configuration.containsKey(STORE_ORIGINAL_IDS_CFG_KEY)) {
-            LOGGER.warn("{} automatically determines whether original identifiers should be stored or not, hence the " +
-                    "configuration setting '{}' will be ignored.", this.getClass(), STORE_ORIGINAL_IDS_CFG_KEY);
-        }
-    }
-}


[3/3] incubator-tinkerpop git commit: tweaked the commit logs (added a counter for processed elements per iteration per thread)

Posted by dk...@apache.org.
tweaked the commit logs (added a counter for processed elements per iteration per thread)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9ec8c96d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9ec8c96d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9ec8c96d

Branch: refs/heads/blvp
Commit: 9ec8c96dad4d68463386b272815553f640d93ea9
Parents: 9cc985d
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Sun Aug 30 14:48:36 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Sun Aug 30 14:48:36 2015 +0200

----------------------------------------------------------------------
 .../bulkloading/BulkLoaderVertexProgram.java        | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9ec8c96d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index b8219fb..9ab21c2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -46,6 +46,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author Daniel Kuppitz (http://gremlin.guru)
@@ -68,7 +69,12 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     private Graph graph;
     private GraphTraversalSource g;
     private long intermediateBatchSize;
-    private long counter;
+    private static ThreadLocal<AtomicLong> counter = new ThreadLocal<AtomicLong>() {
+        @Override
+        protected AtomicLong initialValue() {
+            return new AtomicLong();
+        }
+    };
 
     private BulkLoaderVertexProgram() {
         messageScope = MessageScope.Local.of(__::inE);
@@ -99,7 +105,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                 throw new IllegalStateException(e);
             }
         } else {
-            loader = new DefaultBulkLoader();
+            loader = new IncrementalBulkLoader();
         }
         loader.configure(config);
         return loader;
@@ -112,10 +118,11 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
      * @param close Whether to close the current graph instance after calling commit() or not.
      */
     private void commit(final boolean close) {
-        if (!close && (intermediateBatchSize == 0L || ++counter % intermediateBatchSize != 0)) return;
+        if (!close && (counter.get().incrementAndGet() % intermediateBatchSize != 0 || intermediateBatchSize == 0L))
+            return;
         if (null != graph) {
             if (graph.features().graph().supportsTransactions()) {
-                LOGGER.info("Committing transaction on Graph instance: {}", graph);
+                LOGGER.info("Committing transaction on Graph instance: {} [{}]", graph, counter.get().get());
                 try {
                     graph.tx().commit();
                     LOGGER.debug("Committed transaction on Graph instance: {}", graph);
@@ -139,6 +146,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     @Override
     public void setup(final Memory memory) {
+        counter.get().set(0L);
     }
 
     @Override


[2/3] incubator-tinkerpop git commit: Renamed DefaultBulkLoader to IncrementalBulkLoader as requested by @okram.

Posted by dk...@apache.org.
Renamed DefaultBulkLoader to IncrementalBulkLoader as requested by @okram.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9cc985da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9cc985da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9cc985da

Branch: refs/heads/blvp
Commit: 9cc985da58ac17eda88f4347310f03d8a69883bc
Parents: 9f467c4
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Sun Aug 30 14:46:31 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Sun Aug 30 14:46:31 2015 +0200

----------------------------------------------------------------------
 .../computer/bulkloading/DefaultBulkLoader.java | 153 -------------------
 .../bulkloading/IncrementalBulkLoader.java      | 153 +++++++++++++++++++
 2 files changed, 153 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9cc985da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
deleted file mode 100644
index 75d3494..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-
-import java.util.Iterator;
-
-/**
- * @author Daniel Kuppitz (http://gremlin.guru)
- */
-public class DefaultBulkLoader implements BulkLoader {
-
-    public final static String USER_SUPPLIED_IDS_CFG_KEY = "userSuppliedIds";
-    public final static String KEEP_ORIGINAL_IDS_CFG_KEY = "keepOriginalIds";
-
-    private String bulkLoaderVertexId = BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
-    private boolean keepOriginalIds = true;
-    private boolean userSuppliedIds = false;
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
-        final Iterator<Vertex> iterator = useUserSuppliedIds()
-                ? graph.vertices(vertex.id())
-                : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
-        return iterator.hasNext()
-                ? iterator.next()
-                : useUserSuppliedIds()
-                ? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
-                : graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
-        final Edge e;
-        final Traversal<Vertex, Edge> t = g.V(outVertex).outE(edge.label()).filter(__.inV().is(inVertex));
-        if (t.hasNext()) {
-            e = t.next();
-            edge.properties().forEachRemaining(property -> {
-                final Property<?> existing = e.property(property.key());
-                if (!existing.isPresent() || !existing.value().equals(property.value())) {
-                    e.property(property.key(), property.value());
-                }
-            });
-        } else {
-            e = outVertex.addEdge(edge.label(), inVertex);
-            edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
-        }
-        return e;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
-        final VertexProperty<?> vp;
-        final VertexProperty<?> existing = vertex.property(property.key());
-        if (!existing.isPresent() || !existing.value().equals(property.value())) {
-            vp = vertex.property(property.key(), property.value());
-        } else {
-            vp = existing;
-        }
-        property.properties().forEachRemaining(metaProperty -> {
-            final Property<?> existing2 = vp.property(metaProperty.key());
-            if (!existing2.isPresent() || !existing2.value().equals(metaProperty.value())) {
-                vp.property(metaProperty.key(), metaProperty.value());
-            }
-        });
-        return vp;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Vertex getVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
-        return useUserSuppliedIds()
-                ? getVertexById(vertex.id(), graph, g)
-                : g.V().has(vertex.label(), bulkLoaderVertexId, vertex.id()).next();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean useUserSuppliedIds() {
-        return userSuppliedIds;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean keepOriginalIds() {
-        return keepOriginalIds;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String getVertexIdProperty() {
-        return bulkLoaderVertexId;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void configure(final Configuration configuration) {
-        if (configuration.containsKey(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY)) {
-            bulkLoaderVertexId = configuration.getString(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY);
-        }
-        if (configuration.containsKey(USER_SUPPLIED_IDS_CFG_KEY)) {
-            userSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
-        }
-        if (configuration.containsKey(KEEP_ORIGINAL_IDS_CFG_KEY)) {
-            keepOriginalIds = configuration.getBoolean(KEEP_ORIGINAL_IDS_CFG_KEY);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9cc985da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
new file mode 100644
index 0000000..2177d0b
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+
+import java.util.Iterator;
+
+/**
+ * @author Daniel Kuppitz (http://gremlin.guru)
+ */
+public class IncrementalBulkLoader implements BulkLoader {
+
+    public final static String USER_SUPPLIED_IDS_CFG_KEY = "userSuppliedIds";
+    public final static String KEEP_ORIGINAL_IDS_CFG_KEY = "keepOriginalIds";
+
+    private String bulkLoaderVertexId = BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
+    private boolean keepOriginalIds = true;
+    private boolean userSuppliedIds = false;
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
+        final Iterator<Vertex> iterator = useUserSuppliedIds()
+                ? graph.vertices(vertex.id())
+                : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
+        return iterator.hasNext()
+                ? iterator.next()
+                : useUserSuppliedIds()
+                ? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
+                : graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
+        final Edge e;
+        final Traversal<Vertex, Edge> t = g.V(outVertex).outE(edge.label()).filter(__.inV().is(inVertex));
+        if (t.hasNext()) {
+            e = t.next();
+            edge.properties().forEachRemaining(property -> {
+                final Property<?> existing = e.property(property.key());
+                if (!existing.isPresent() || !existing.value().equals(property.value())) {
+                    e.property(property.key(), property.value());
+                }
+            });
+        } else {
+            e = outVertex.addEdge(edge.label(), inVertex);
+            edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+        }
+        return e;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
+        final VertexProperty<?> vp;
+        final VertexProperty<?> existing = vertex.property(property.key());
+        if (!existing.isPresent() || !existing.value().equals(property.value())) {
+            vp = vertex.property(property.key(), property.value());
+        } else {
+            vp = existing;
+        }
+        property.properties().forEachRemaining(metaProperty -> {
+            final Property<?> existing2 = vp.property(metaProperty.key());
+            if (!existing2.isPresent() || !existing2.value().equals(metaProperty.value())) {
+                vp.property(metaProperty.key(), metaProperty.value());
+            }
+        });
+        return vp;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Vertex getVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
+        return useUserSuppliedIds()
+                ? getVertexById(vertex.id(), graph, g)
+                : g.V().has(vertex.label(), bulkLoaderVertexId, vertex.id()).next();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean useUserSuppliedIds() {
+        return userSuppliedIds;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean keepOriginalIds() {
+        return keepOriginalIds;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String getVertexIdProperty() {
+        return bulkLoaderVertexId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void configure(final Configuration configuration) {
+        if (configuration.containsKey(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY)) {
+            bulkLoaderVertexId = configuration.getString(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY);
+        }
+        if (configuration.containsKey(USER_SUPPLIED_IDS_CFG_KEY)) {
+            userSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
+        }
+        if (configuration.containsKey(KEEP_ORIGINAL_IDS_CFG_KEY)) {
+            keepOriginalIds = configuration.getBoolean(KEEP_ORIGINAL_IDS_CFG_KEY);
+        }
+    }
+}