You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/08/27 23:58:58 UTC

[1/2] incubator-tinkerpop git commit: removed unnecessary code to make BLVP thread safe

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/blvp abb9a73b6 -> 4978e331b


removed unnecessary code to make BLVP thread safe


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/99a85aee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/99a85aee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/99a85aee

Branch: refs/heads/blvp
Commit: 99a85aee46535e2f26f24178153e9b3e207ba1a2
Parents: abb9a73
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Thu Aug 27 20:03:10 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Thu Aug 27 20:03:10 2015 +0200

----------------------------------------------------------------------
 .../bulkloading/BulkLoaderVertexProgram.java       | 17 +++++------------
 1 file changed, 5 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/99a85aee/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 67d8da0..bfa88fd 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -46,7 +46,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author Daniel Kuppitz (http://gremlin.guru)
@@ -68,8 +67,8 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     private BulkLoader bulkLoader;
     private Graph graph;
     private GraphTraversalSource g;
-    private Long intermediateBatchSize;
-    private AtomicLong counter = new AtomicLong();
+    private long intermediateBatchSize;
+    private long counter;
 
     private BulkLoaderVertexProgram() {
         messageScope = MessageScope.Local.of(__::inE);
@@ -107,14 +106,14 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     }
 
     private void commit(final boolean force) {
-        if (!force && (intermediateBatchSize == 0L || counter.incrementAndGet() % intermediateBatchSize != 0)) return;
+        if (!force && (intermediateBatchSize == 0L || ++counter % intermediateBatchSize != 0)) return;
         if (null != graph) {
             if (graph.features().graph().supportsTransactions()) {
                 LOGGER.info("Committing transaction on Graph instance: {}", graph);
                 try {
                     graph.tx().commit(); // TODO will Giraph/MR restart the program and re-run execute if this fails?
                     LOGGER.debug("Committed transaction on Graph instance: {}", graph);
-                    counter.set(0L);
+                    counter = 0L;
                 } catch (Exception e) {
                     LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
                     graph.tx().rollback();
@@ -148,13 +147,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                     DEFAULT_BULK_LOADER_VERTEX_ID);
         }
         intermediateBatchSize = configuration.getLong(INTERMEDIATE_BATCH_SIZE_CFG_KEY, 0L);
-        final String bulkLoaderVertexIdProperty = configuration.subset(BULK_LOADER_CFG_KEY)
-                .getString(BULK_LOADER_VERTEX_ID_CFG_KEY);
-        if (!elementComputeKeys.contains(bulkLoaderVertexIdProperty)) {
-            synchronized (elementComputeKeys) {
-                elementComputeKeys.add(bulkLoaderVertexIdProperty);
-            }
-        }
+        elementComputeKeys.add(configuration.subset(BULK_LOADER_CFG_KEY).getString(BULK_LOADER_VERTEX_ID_CFG_KEY));
     }
 
     @Override


[2/2] incubator-tinkerpop git commit: fixed and optimized IncrementalBulkLoader

Posted by dk...@apache.org.
fixed and optimized IncrementalBulkLoader


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4978e331
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4978e331
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4978e331

Branch: refs/heads/blvp
Commit: 4978e331bdb339a48be4f42d75f584caab766e94
Parents: 99a85ae
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Thu Aug 27 23:58:23 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Thu Aug 27 23:58:23 2015 +0200

----------------------------------------------------------------------
 .../bulkloading/BulkLoaderVertexProgram.java    | 24 ++++++++++------
 .../bulkloading/IncrementalBulkLoader.java      | 30 +++++++++++++++++++-
 2 files changed, 45 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4978e331/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index bfa88fd..2f36a6e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -105,8 +105,14 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
         return loader;
     }
 
-    private void commit(final boolean force) {
-        if (!force && (intermediateBatchSize == 0L || ++counter % intermediateBatchSize != 0)) return;
+    /**
+     * Eventually commits the current transaction and closes the current graph instance. commit() will be called
+     * if close is set true, otherwise it will only be called if the intermediate batch size is set and reached.
+     *
+     * @param close Whether to close the current graph instance after calling commit() or not.
+     */
+    private void commit(final boolean close) {
+        if (!close && (intermediateBatchSize == 0L || ++counter % intermediateBatchSize != 0)) return;
         if (null != graph) {
             if (graph.features().graph().supportsTransactions()) {
                 LOGGER.info("Committing transaction on Graph instance: {}", graph);
@@ -120,12 +126,14 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                     throw e;
                 }
             }
-            try {
-                graph.close();
-                LOGGER.info("Closed Graph instance: {}", graph);
-                graph = null;
-            } catch (Exception e) {
-                LOGGER.warn("Failed to close Graph instance", e);
+            if (close) {
+                try {
+                    graph.close();
+                    LOGGER.info("Closed Graph instance: {}", graph);
+                    graph = null;
+                } catch (Exception e) {
+                    LOGGER.warn("Failed to close Graph instance", e);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4978e331/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
index c4e0737..7822c2d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
@@ -24,7 +24,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSo
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +58,12 @@ public class IncrementalBulkLoader extends DefaultBulkLoader {
         final Traversal<Vertex, Edge> t = g.V(outVertex).outE(edge.label()).filter(__.inV().is(inVertex));
         if (t.hasNext()) {
             final Edge e = t.next();
-            edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+            edge.properties().forEachRemaining(property -> {
+                final Property<?> existing = e.property(property.key());
+                if (!existing.isPresent() || !existing.value().equals(property.value())) {
+                    e.property(property.key(), property.value());
+                }
+            });
             return e;
         }
         return super.getOrCreateEdge(edge, outVertex, inVertex, graph, g);
@@ -66,6 +73,27 @@ public class IncrementalBulkLoader extends DefaultBulkLoader {
      * {@inheritDoc}
      */
     @Override
+    public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
+        final VertexProperty<?> vp;
+        final VertexProperty<?> existing = vertex.property(property.key());
+        if (!existing.isPresent() || !existing.value().equals(property.value())) {
+            vp = vertex.property(property.key(), property.value());
+        } else {
+            vp = existing;
+        }
+        property.properties().forEachRemaining(metaProperty -> {
+            final Property<?> existing2 = vp.property(metaProperty.key());
+            if (!existing2.isPresent() || !existing2.value().equals(metaProperty.value())) {
+                vp.property(metaProperty.key(), metaProperty.value());
+            }
+        });
+        return vp;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public boolean storeOriginalIds() {
         return !useUserSuppliedIds();
     }