You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/09/01 16:26:25 UTC

[09/19] incubator-tinkerpop git commit: cleaned up the BLVP configuration system and added a new configuration option to support intermediate commits (experimental)

cleaned up the BLVP configuration system and added a new configuration option to support intermediate commits (experimental)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/abb9a73b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/abb9a73b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/abb9a73b

Branch: refs/heads/tp30
Commit: abb9a73b6aa247ac6a2911563a27a3fef1da0d8a
Parents: cf088c4
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Thu Aug 27 19:38:17 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Thu Aug 27 19:38:17 2015 +0200

----------------------------------------------------------------------
 .../computer/bulkloading/BulkLoader.java        |   9 +-
 .../bulkloading/BulkLoaderVertexProgram.java    | 177 ++++++++++---------
 .../computer/bulkloading/DefaultBulkLoader.java |  24 ++-
 .../bulkloading/IncrementalBulkLoader.java      |   2 +-
 4 files changed, 121 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
index 268c0cc..5619971 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
@@ -74,7 +74,7 @@ public interface BulkLoader {
     public default Vertex getVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
         return useUserSuppliedIds()
                 ? getVertexById(vertex.id(), graph, g)
-                : g.V().has(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id()).next();
+                : g.V().has(getVertexIdProperty(), vertex.id()).next();
     }
 
     /**
@@ -102,6 +102,13 @@ public interface BulkLoader {
     }
 
     /**
+     * @return The name of the vertex property that is used to store the original vertex id.
+     */
+    public default String getVertexIdProperty() {
+        return BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
+    }
+
+    /**
      * Configures the BulkLoader instance.
      *
      * @param configuration The BulkLoader configuration.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 65f6697..67d8da0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
 
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
@@ -39,14 +40,13 @@ import org.javatuples.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileInputStream;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author Daniel Kuppitz (http://gremlin.guru)
@@ -55,12 +55,12 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BulkLoaderVertexProgram.class);
 
-    private static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "bulkloader.conf";
-    private static final String BULK_LOADER_CLASS = BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX + ".class";
-    private static final String BULK_LOADER_GRAPH_CFG_PREFIX = BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX + ".graph";
-    private static final String BULK_LOADER_CFG_PREFIX = BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX + ".loader";
-
-    public static final String BULK_LOADER_VERTEX_ID = "bulkloader.vertex-id";
+    public static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.bulkLoading";
+    private static final String GRAPH_CFG_KEY = "graph";
+    private static final String BULK_LOADER_CFG_KEY = "loader";
+    private static final String INTERMEDIATE_BATCH_SIZE_CFG_KEY = "intermediateBatchSize";
+    public static final String BULK_LOADER_VERTEX_ID_CFG_KEY = "vertexIdProperty";
+    public static final String DEFAULT_BULK_LOADER_VERTEX_ID = "bulkLoader.vertex.id";
 
     private final MessageScope messageScope;
     private final Set<String> elementComputeKeys;
@@ -68,10 +68,67 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     private BulkLoader bulkLoader;
     private Graph graph;
     private GraphTraversalSource g;
+    private Long intermediateBatchSize;
+    private AtomicLong counter = new AtomicLong();
 
     private BulkLoaderVertexProgram() {
         messageScope = MessageScope.Local.of(__::inE);
-        elementComputeKeys = Collections.singleton(BULK_LOADER_VERTEX_ID);
+        elementComputeKeys = new HashSet<>();
+    }
+
+    private Configuration getGraphConfiguration() {
+        final Configuration config = configuration.subset(GRAPH_CFG_KEY);
+        config.setProperty(Graph.GRAPH, config.getString("class"));
+        config.clearProperty("class");
+        return config;
+    }
+
+    private BulkLoader createBulkLoader() {
+        final BulkLoader loader;
+        final Configuration config = configuration.subset(BULK_LOADER_CFG_KEY);
+        if (config.containsKey("class")) {
+            final String className = config.getString("class");
+            config.clearProperty("class");
+            try {
+                final Class<?> bulkLoaderClass = Class.forName(className);
+                loader = (BulkLoader) bulkLoaderClass.getConstructor().newInstance();
+            } catch (ClassNotFoundException e) {
+                LOGGER.error("Unable to find custom bulk loader class: {}", className);
+                throw new IllegalStateException(e);
+            } catch (Exception e) {
+                LOGGER.error("Unable to create an instance of the given bulk loader class: {}", className);
+                throw new IllegalStateException(e);
+            }
+        } else {
+            loader = new DefaultBulkLoader();
+        }
+        loader.configure(config);
+        return loader;
+    }
+
+    private void commit(final boolean force) {
+        if (!force && (intermediateBatchSize == 0L || counter.incrementAndGet() % intermediateBatchSize != 0)) return;
+        if (null != graph) {
+            if (graph.features().graph().supportsTransactions()) {
+                LOGGER.info("Committing transaction on Graph instance: {}", graph);
+                try {
+                    graph.tx().commit(); // TODO will Giraph/MR restart the program and re-run execute if this fails?
+                    LOGGER.debug("Committed transaction on Graph instance: {}", graph);
+                    counter.set(0L);
+                } catch (Exception e) {
+                    LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
+                    graph.tx().rollback();
+                    throw e;
+                }
+            }
+            try {
+                graph.close();
+                LOGGER.info("Closed Graph instance: {}", graph);
+                graph = null;
+            } catch (Exception e) {
+                LOGGER.warn("Failed to close Graph instance", e);
+            }
+        }
     }
 
     @Override
@@ -83,51 +140,42 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     public void loadState(final Graph graph, final Configuration config) {
         configuration = new BaseConfiguration();
         if (config != null) {
-            final Iterator<String> keys = config.getKeys();
-            while (keys.hasNext()) {
-                final String key = keys.next();
-                configuration.setProperty(key, config.getProperty(key));
+            ConfigurationUtils.copy(config, configuration);
+        }
+        if (!configuration.subset(BULK_LOADER_CFG_KEY).containsKey(BULK_LOADER_VERTEX_ID_CFG_KEY)) {
+            configuration.addProperty(
+                    String.join(".", BULK_LOADER_CFG_KEY, BULK_LOADER_VERTEX_ID_CFG_KEY),
+                    DEFAULT_BULK_LOADER_VERTEX_ID);
+        }
+        intermediateBatchSize = configuration.getLong(INTERMEDIATE_BATCH_SIZE_CFG_KEY, 0L);
+        final String bulkLoaderVertexIdProperty = configuration.subset(BULK_LOADER_CFG_KEY)
+                .getString(BULK_LOADER_VERTEX_ID_CFG_KEY);
+        if (!elementComputeKeys.contains(bulkLoaderVertexIdProperty)) {
+            synchronized (elementComputeKeys) {
+                elementComputeKeys.add(bulkLoaderVertexIdProperty);
             }
         }
-        final Configuration blvpConfiguration = graph.configuration().subset(BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX);
-        blvpConfiguration.getKeys().forEachRemaining(key -> configuration.setProperty(key, blvpConfiguration.getProperty(key)));
     }
 
     @Override
     public void storeState(final Configuration config) {
         VertexProgram.super.storeState(config);
         if (configuration != null) {
-            configuration.getKeys().forEachRemaining(key -> config.setProperty(key, configuration.getProperty(key)));
+            ConfigurationUtils.copy(configuration, config);
         }
     }
 
     @Override
     public void workerIterationStart(final Memory memory) {
         if (null == graph) {
-            graph = GraphFactory.open(configuration.subset(BULK_LOADER_GRAPH_CFG_PREFIX));
+            graph = GraphFactory.open(getGraphConfiguration());
             LOGGER.info("Opened Graph instance: {}", graph);
             try {
                 if (!graph.features().graph().supportsConcurrentAccess()) {
                     throw new IllegalStateException("The given graph instance does not allow concurrent access.");
                 }
-                if (graph.features().graph().supportsTransactions()) {
-                    if (!graph.features().graph().supportsThreadedTransactions()) {
-                        throw new IllegalStateException("The given graph instance does not support threaded transactions.");
-                    }
-                }
                 g = graph.traversal();
-                final String bulkLoaderClassName = configuration.getString(BULK_LOADER_CLASS, DefaultBulkLoader.class.getCanonicalName());
-                try {
-                    final Class<?> bulkLoaderClass = Class.forName(bulkLoaderClassName);
-                    bulkLoader = (BulkLoader) bulkLoaderClass.getConstructor().newInstance();
-                } catch (ClassNotFoundException e) {
-                    LOGGER.error("Unable to find custom bulk loader class: {}", bulkLoaderClassName);
-                    throw new IllegalStateException(e);
-                } catch (Exception e) {
-                    LOGGER.error("Unable to create an instance of the given bulk loader class: {}", bulkLoaderClassName);
-                    throw new IllegalStateException(e);
-                }
-                bulkLoader.configure(configuration.subset(BULK_LOADER_CFG_PREFIX));
+                bulkLoader = createBulkLoader();
             } catch (Exception e) {
                 try {
                     graph.close();
@@ -143,26 +191,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     @Override
     public void workerIterationEnd(final Memory memory) {
-        if (null != graph) {
-            if (graph.features().graph().supportsTransactions()) {
-                LOGGER.info("Committing transaction on Graph instance: {}", graph);
-                try {
-                    graph.tx().commit(); // TODO will Giraph/MR restart the program and re-run execute if this fails?
-                    LOGGER.debug("Committed transaction on Graph instance: {}", graph);
-                } catch (Exception e) {
-                    LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
-                    graph.tx().rollback();
-                    throw e;
-                }
-            }
-            try {
-                graph.close();
-                LOGGER.info("Closed Graph instance: {}", graph);
-                graph = null;
-            } catch (Exception e) {
-                LOGGER.warn("Failed to close Graph instance", e);
-            }
-        }
+        this.commit(true);
     }
 
     @Override
@@ -186,10 +215,11 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
             while (vpi.hasNext()) {
                 bulkLoader.getOrCreateVertexProperty(vpi.next(), targetVertex, graph, g);
             }
+            this.commit(false);
             if (!bulkLoader.useUserSuppliedIds()) {
                 // create an id pair and send it to all the vertex's incoming adjacent vertices
-                sourceVertex.property(BULK_LOADER_VERTEX_ID, targetVertex.id());
-                messenger.sendMessage(this.messageScope, Pair.with(sourceVertex.id(), targetVertex.id()));
+                sourceVertex.property(bulkLoader.getVertexIdProperty(), targetVertex.id());
+                messenger.sendMessage(messageScope, Pair.with(sourceVertex.id(), targetVertex.id()));
             }
         } else {
             if (bulkLoader.useUserSuppliedIds()) {
@@ -207,13 +237,14 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
                     idPairs.put(idPair.getValue(0), idPair.getValue(1));
                 }
                 // get the vertex given the dummy id property
-                final Long outVId = sourceVertex.value(BULK_LOADER_VERTEX_ID);
+                final Long outVId = sourceVertex.value(bulkLoader.getVertexIdProperty());
                 final Vertex outV = bulkLoader.getVertexById(outVId, graph, g);
                 // for all the incoming edges of the vertex, get the incoming adjacent vertex and write the edge and its properties
                 sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
                     final Object inVId = idPairs.get(edge.inVertex().id());
                     final Vertex inV = bulkLoader.getVertexById(inVId, graph, g);
                     bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+                    this.commit(false);
                 });
             }
         }
@@ -231,9 +262,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
 
     @Override
     public Set<MessageScope> getMessageScopes(final Memory memory) {
-        return new HashSet<MessageScope>() {{
-            add(messageScope);
-        }};
+        return Collections.singleton(messageScope);
     }
 
     @Override
@@ -266,29 +295,11 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
             super(BulkLoaderVertexProgram.class);
         }
 
-        public Builder graphConfiguration(final String propertiesFileLocation) {
-            try {
-                final Properties properties = new Properties();
-                properties.load(new FileInputStream(propertiesFileLocation));
-                properties.forEach((key, value) -> configuration.setProperty(BULK_LOADER_GRAPH_CFG_PREFIX + "." + key, value));
-                return this;
-            } catch (final Exception e) {
-                throw new IllegalArgumentException(e.getMessage(), e);
-            }
-        }
-
-        public Builder bulkLoaderClass(final Class<? extends BulkLoader> bulkLoaderClass) {
-            return bulkLoaderClass(bulkLoaderClass, null);
-        }
-
-        public Builder bulkLoaderClass(final Class<? extends BulkLoader> bulkLoaderClass, final Configuration configuration) {
-            this.configuration.setProperty(BULK_LOADER_CLASS, bulkLoaderClass.getCanonicalName());
-            if (configuration != null) {
-                configuration.getKeys().forEachRemaining(key -> this.configuration.addProperty(
-                        BULK_LOADER_CFG_PREFIX + "." + key, configuration.getProperty(key)
-                ));
-            }
-            return this;
+        @SuppressWarnings("unchecked")
+        @Override
+        public BulkLoaderVertexProgram create(final Graph graph) {
+            ConfigurationUtils.append(graph.configuration().subset(BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX), configuration);
+            return (BulkLoaderVertexProgram) VertexProgram.createVertexProgram(graph, this.configuration);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
index 601079d..029d379 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
@@ -31,9 +31,10 @@ import org.apache.tinkerpop.gremlin.structure.VertexProperty;
  */
 public class DefaultBulkLoader implements BulkLoader {
 
-    public final static String USE_USER_SUPPLIED_IDS_CFG_KEY = "use-user-supplied-ids";
-    public final static String STORE_ORIGINAL_IDS_CFG_KEY = "store-original-ids";
+    public final static String USER_SUPPLIED_IDS_CFG_KEY = "userSuppliedIds";
+    public final static String STORE_ORIGINAL_IDS_CFG_KEY = "storeOriginalIds";
 
+    private String bulkLoaderVertexId = BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
     private boolean storeOriginalIds = false;
     private boolean useUserSuppliedIds = false;
 
@@ -47,7 +48,7 @@ public class DefaultBulkLoader implements BulkLoader {
         }
         final Vertex v = graph.addVertex(T.label, vertex.label());
         if (storeOriginalIds()) {
-            v.property(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id());
+            v.property(bulkLoaderVertexId, vertex.id());
         }
         return v;
     }
@@ -79,7 +80,7 @@ public class DefaultBulkLoader implements BulkLoader {
     public Vertex getVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
         return useUserSuppliedIds()
                 ? getVertexById(vertex.id(), graph, g)
-                : g.V().has(vertex.label(), BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id()).next();
+                : g.V().has(vertex.label(), bulkLoaderVertexId, vertex.id()).next();
     }
 
     /**
@@ -102,9 +103,20 @@ public class DefaultBulkLoader implements BulkLoader {
      * {@inheritDoc}
      */
     @Override
+    public String getVertexIdProperty() {
+        return bulkLoaderVertexId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void configure(final Configuration configuration) {
-        if (configuration.containsKey(USE_USER_SUPPLIED_IDS_CFG_KEY)) {
-            useUserSuppliedIds = configuration.getBoolean(USE_USER_SUPPLIED_IDS_CFG_KEY);
+        if (configuration.containsKey(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY)) {
+            bulkLoaderVertexId = configuration.getString(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY);
+        }
+        if (configuration.containsKey(USER_SUPPLIED_IDS_CFG_KEY)) {
+            useUserSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
         }
         if (configuration.containsKey(STORE_ORIGINAL_IDS_CFG_KEY)) {
             storeOriginalIds = configuration.getBoolean(STORE_ORIGINAL_IDS_CFG_KEY);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
index d87df73..c4e0737 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
@@ -44,7 +44,7 @@ public class IncrementalBulkLoader extends DefaultBulkLoader {
     public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
         final Iterator<Vertex> iterator = useUserSuppliedIds()
                 ? graph.vertices(vertex.id())
-                : g.V().has(vertex.label(), BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id());
+                : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
         return iterator.hasNext() ? iterator.next() : super.getOrCreateVertex(vertex, graph, g);
     }