You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/09/03 17:33:29 UTC
[09/31] incubator-tinkerpop git commit: cleaned up the BLVP
configuration system and added a new configuration option to support
intermediate commits (experimental)
cleaned up the BLVP configuration system and added a new configuration option to support intermediate commits (experimental)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/abb9a73b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/abb9a73b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/abb9a73b
Branch: refs/heads/master
Commit: abb9a73b6aa247ac6a2911563a27a3fef1da0d8a
Parents: cf088c4
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Thu Aug 27 19:38:17 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Thu Aug 27 19:38:17 2015 +0200
----------------------------------------------------------------------
.../computer/bulkloading/BulkLoader.java | 9 +-
.../bulkloading/BulkLoaderVertexProgram.java | 177 ++++++++++---------
.../computer/bulkloading/DefaultBulkLoader.java | 24 ++-
.../bulkloading/IncrementalBulkLoader.java | 2 +-
4 files changed, 121 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
index 268c0cc..5619971 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
@@ -74,7 +74,7 @@ public interface BulkLoader {
public default Vertex getVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
return useUserSuppliedIds()
? getVertexById(vertex.id(), graph, g)
- : g.V().has(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id()).next();
+ : g.V().has(getVertexIdProperty(), vertex.id()).next();
}
/**
@@ -102,6 +102,13 @@ public interface BulkLoader {
}
/**
+ * @return The name of the vertex property that is used to store the original vertex id.
+ */
+ public default String getVertexIdProperty() {
+ return BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
+ }
+
+ /**
* Configures the BulkLoader instance.
*
* @param configuration The BulkLoader configuration.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 65f6697..67d8da0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
@@ -39,14 +40,13 @@ import org.javatuples.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileInputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
/**
* @author Daniel Kuppitz (http://gremlin.guru)
@@ -55,12 +55,12 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkLoaderVertexProgram.class);
- private static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "bulkloader.conf";
- private static final String BULK_LOADER_CLASS = BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX + ".class";
- private static final String BULK_LOADER_GRAPH_CFG_PREFIX = BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX + ".graph";
- private static final String BULK_LOADER_CFG_PREFIX = BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX + ".loader";
-
- public static final String BULK_LOADER_VERTEX_ID = "bulkloader.vertex-id";
+ public static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.bulkLoading";
+ private static final String GRAPH_CFG_KEY = "graph";
+ private static final String BULK_LOADER_CFG_KEY = "loader";
+ private static final String INTERMEDIATE_BATCH_SIZE_CFG_KEY = "intermediateBatchSize";
+ public static final String BULK_LOADER_VERTEX_ID_CFG_KEY = "vertexIdProperty";
+ public static final String DEFAULT_BULK_LOADER_VERTEX_ID = "bulkLoader.vertex.id";
private final MessageScope messageScope;
private final Set<String> elementComputeKeys;
@@ -68,10 +68,67 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
private BulkLoader bulkLoader;
private Graph graph;
private GraphTraversalSource g;
+ private Long intermediateBatchSize;
+ private AtomicLong counter = new AtomicLong();
private BulkLoaderVertexProgram() {
messageScope = MessageScope.Local.of(__::inE);
- elementComputeKeys = Collections.singleton(BULK_LOADER_VERTEX_ID);
+ elementComputeKeys = new HashSet<>();
+ }
+
+ private Configuration getGraphConfiguration() {
+ final Configuration config = configuration.subset(GRAPH_CFG_KEY);
+ config.setProperty(Graph.GRAPH, config.getString("class"));
+ config.clearProperty("class");
+ return config;
+ }
+
+ private BulkLoader createBulkLoader() {
+ final BulkLoader loader;
+ final Configuration config = configuration.subset(BULK_LOADER_CFG_KEY);
+ if (config.containsKey("class")) {
+ final String className = config.getString("class");
+ config.clearProperty("class");
+ try {
+ final Class<?> bulkLoaderClass = Class.forName(className);
+ loader = (BulkLoader) bulkLoaderClass.getConstructor().newInstance();
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("Unable to find custom bulk loader class: {}", className);
+ throw new IllegalStateException(e);
+ } catch (Exception e) {
+ LOGGER.error("Unable to create an instance of the given bulk loader class: {}", className);
+ throw new IllegalStateException(e);
+ }
+ } else {
+ loader = new DefaultBulkLoader();
+ }
+ loader.configure(config);
+ return loader;
+ }
+
+ private void commit(final boolean force) {
+ if (!force && (intermediateBatchSize == 0L || counter.incrementAndGet() % intermediateBatchSize != 0)) return;
+ if (null != graph) {
+ if (graph.features().graph().supportsTransactions()) {
+ LOGGER.info("Committing transaction on Graph instance: {}", graph);
+ try {
+ graph.tx().commit(); // TODO will Giraph/MR restart the program and re-run execute if this fails?
+ LOGGER.debug("Committed transaction on Graph instance: {}", graph);
+ counter.set(0L);
+ } catch (Exception e) {
+ LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
+ graph.tx().rollback();
+ throw e;
+ }
+ }
+ try {
+ graph.close();
+ LOGGER.info("Closed Graph instance: {}", graph);
+ graph = null;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close Graph instance", e);
+ }
+ }
}
@Override
@@ -83,51 +140,42 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
public void loadState(final Graph graph, final Configuration config) {
configuration = new BaseConfiguration();
if (config != null) {
- final Iterator<String> keys = config.getKeys();
- while (keys.hasNext()) {
- final String key = keys.next();
- configuration.setProperty(key, config.getProperty(key));
+ ConfigurationUtils.copy(config, configuration);
+ }
+ if (!configuration.subset(BULK_LOADER_CFG_KEY).containsKey(BULK_LOADER_VERTEX_ID_CFG_KEY)) {
+ configuration.addProperty(
+ String.join(".", BULK_LOADER_CFG_KEY, BULK_LOADER_VERTEX_ID_CFG_KEY),
+ DEFAULT_BULK_LOADER_VERTEX_ID);
+ }
+ intermediateBatchSize = configuration.getLong(INTERMEDIATE_BATCH_SIZE_CFG_KEY, 0L);
+ final String bulkLoaderVertexIdProperty = configuration.subset(BULK_LOADER_CFG_KEY)
+ .getString(BULK_LOADER_VERTEX_ID_CFG_KEY);
+ if (!elementComputeKeys.contains(bulkLoaderVertexIdProperty)) {
+ synchronized (elementComputeKeys) {
+ elementComputeKeys.add(bulkLoaderVertexIdProperty);
}
}
- final Configuration blvpConfiguration = graph.configuration().subset(BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX);
- blvpConfiguration.getKeys().forEachRemaining(key -> configuration.setProperty(key, blvpConfiguration.getProperty(key)));
}
@Override
public void storeState(final Configuration config) {
VertexProgram.super.storeState(config);
if (configuration != null) {
- configuration.getKeys().forEachRemaining(key -> config.setProperty(key, configuration.getProperty(key)));
+ ConfigurationUtils.copy(configuration, config);
}
}
@Override
public void workerIterationStart(final Memory memory) {
if (null == graph) {
- graph = GraphFactory.open(configuration.subset(BULK_LOADER_GRAPH_CFG_PREFIX));
+ graph = GraphFactory.open(getGraphConfiguration());
LOGGER.info("Opened Graph instance: {}", graph);
try {
if (!graph.features().graph().supportsConcurrentAccess()) {
throw new IllegalStateException("The given graph instance does not allow concurrent access.");
}
- if (graph.features().graph().supportsTransactions()) {
- if (!graph.features().graph().supportsThreadedTransactions()) {
- throw new IllegalStateException("The given graph instance does not support threaded transactions.");
- }
- }
g = graph.traversal();
- final String bulkLoaderClassName = configuration.getString(BULK_LOADER_CLASS, DefaultBulkLoader.class.getCanonicalName());
- try {
- final Class<?> bulkLoaderClass = Class.forName(bulkLoaderClassName);
- bulkLoader = (BulkLoader) bulkLoaderClass.getConstructor().newInstance();
- } catch (ClassNotFoundException e) {
- LOGGER.error("Unable to find custom bulk loader class: {}", bulkLoaderClassName);
- throw new IllegalStateException(e);
- } catch (Exception e) {
- LOGGER.error("Unable to create an instance of the given bulk loader class: {}", bulkLoaderClassName);
- throw new IllegalStateException(e);
- }
- bulkLoader.configure(configuration.subset(BULK_LOADER_CFG_PREFIX));
+ bulkLoader = createBulkLoader();
} catch (Exception e) {
try {
graph.close();
@@ -143,26 +191,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
@Override
public void workerIterationEnd(final Memory memory) {
- if (null != graph) {
- if (graph.features().graph().supportsTransactions()) {
- LOGGER.info("Committing transaction on Graph instance: {}", graph);
- try {
- graph.tx().commit(); // TODO will Giraph/MR restart the program and re-run execute if this fails?
- LOGGER.debug("Committed transaction on Graph instance: {}", graph);
- } catch (Exception e) {
- LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
- graph.tx().rollback();
- throw e;
- }
- }
- try {
- graph.close();
- LOGGER.info("Closed Graph instance: {}", graph);
- graph = null;
- } catch (Exception e) {
- LOGGER.warn("Failed to close Graph instance", e);
- }
- }
+ this.commit(true);
}
@Override
@@ -186,10 +215,11 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
while (vpi.hasNext()) {
bulkLoader.getOrCreateVertexProperty(vpi.next(), targetVertex, graph, g);
}
+ this.commit(false);
if (!bulkLoader.useUserSuppliedIds()) {
// create an id pair and send it to all the vertex's incoming adjacent vertices
- sourceVertex.property(BULK_LOADER_VERTEX_ID, targetVertex.id());
- messenger.sendMessage(this.messageScope, Pair.with(sourceVertex.id(), targetVertex.id()));
+ sourceVertex.property(bulkLoader.getVertexIdProperty(), targetVertex.id());
+ messenger.sendMessage(messageScope, Pair.with(sourceVertex.id(), targetVertex.id()));
}
} else {
if (bulkLoader.useUserSuppliedIds()) {
@@ -207,13 +237,14 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
idPairs.put(idPair.getValue(0), idPair.getValue(1));
}
// get the vertex given the dummy id property
- final Long outVId = sourceVertex.value(BULK_LOADER_VERTEX_ID);
+ final Long outVId = sourceVertex.value(bulkLoader.getVertexIdProperty());
final Vertex outV = bulkLoader.getVertexById(outVId, graph, g);
// for all the incoming edges of the vertex, get the incoming adjacent vertex and write the edge and its properties
sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
final Object inVId = idPairs.get(edge.inVertex().id());
final Vertex inV = bulkLoader.getVertexById(inVId, graph, g);
bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+ this.commit(false);
});
}
}
@@ -231,9 +262,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
- return new HashSet<MessageScope>() {{
- add(messageScope);
- }};
+ return Collections.singleton(messageScope);
}
@Override
@@ -266,29 +295,11 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
super(BulkLoaderVertexProgram.class);
}
- public Builder graphConfiguration(final String propertiesFileLocation) {
- try {
- final Properties properties = new Properties();
- properties.load(new FileInputStream(propertiesFileLocation));
- properties.forEach((key, value) -> configuration.setProperty(BULK_LOADER_GRAPH_CFG_PREFIX + "." + key, value));
- return this;
- } catch (final Exception e) {
- throw new IllegalArgumentException(e.getMessage(), e);
- }
- }
-
- public Builder bulkLoaderClass(final Class<? extends BulkLoader> bulkLoaderClass) {
- return bulkLoaderClass(bulkLoaderClass, null);
- }
-
- public Builder bulkLoaderClass(final Class<? extends BulkLoader> bulkLoaderClass, final Configuration configuration) {
- this.configuration.setProperty(BULK_LOADER_CLASS, bulkLoaderClass.getCanonicalName());
- if (configuration != null) {
- configuration.getKeys().forEachRemaining(key -> this.configuration.addProperty(
- BULK_LOADER_CFG_PREFIX + "." + key, configuration.getProperty(key)
- ));
- }
- return this;
+ @SuppressWarnings("unchecked")
+ @Override
+ public BulkLoaderVertexProgram create(final Graph graph) {
+ ConfigurationUtils.append(graph.configuration().subset(BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX), configuration);
+ return (BulkLoaderVertexProgram) VertexProgram.createVertexProgram(graph, this.configuration);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
index 601079d..029d379 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
@@ -31,9 +31,10 @@ import org.apache.tinkerpop.gremlin.structure.VertexProperty;
*/
public class DefaultBulkLoader implements BulkLoader {
- public final static String USE_USER_SUPPLIED_IDS_CFG_KEY = "use-user-supplied-ids";
- public final static String STORE_ORIGINAL_IDS_CFG_KEY = "store-original-ids";
+ public final static String USER_SUPPLIED_IDS_CFG_KEY = "userSuppliedIds";
+ public final static String STORE_ORIGINAL_IDS_CFG_KEY = "storeOriginalIds";
+ private String bulkLoaderVertexId = BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
private boolean storeOriginalIds = false;
private boolean useUserSuppliedIds = false;
@@ -47,7 +48,7 @@ public class DefaultBulkLoader implements BulkLoader {
}
final Vertex v = graph.addVertex(T.label, vertex.label());
if (storeOriginalIds()) {
- v.property(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id());
+ v.property(bulkLoaderVertexId, vertex.id());
}
return v;
}
@@ -79,7 +80,7 @@ public class DefaultBulkLoader implements BulkLoader {
public Vertex getVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
return useUserSuppliedIds()
? getVertexById(vertex.id(), graph, g)
- : g.V().has(vertex.label(), BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id()).next();
+ : g.V().has(vertex.label(), bulkLoaderVertexId, vertex.id()).next();
}
/**
@@ -102,9 +103,20 @@ public class DefaultBulkLoader implements BulkLoader {
* {@inheritDoc}
*/
@Override
+ public String getVertexIdProperty() {
+ return bulkLoaderVertexId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void configure(final Configuration configuration) {
- if (configuration.containsKey(USE_USER_SUPPLIED_IDS_CFG_KEY)) {
- useUserSuppliedIds = configuration.getBoolean(USE_USER_SUPPLIED_IDS_CFG_KEY);
+ if (configuration.containsKey(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY)) {
+ bulkLoaderVertexId = configuration.getString(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY);
+ }
+ if (configuration.containsKey(USER_SUPPLIED_IDS_CFG_KEY)) {
+ useUserSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
}
if (configuration.containsKey(STORE_ORIGINAL_IDS_CFG_KEY)) {
storeOriginalIds = configuration.getBoolean(STORE_ORIGINAL_IDS_CFG_KEY);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/abb9a73b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
index d87df73..c4e0737 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
@@ -44,7 +44,7 @@ public class IncrementalBulkLoader extends DefaultBulkLoader {
public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
final Iterator<Vertex> iterator = useUserSuppliedIds()
? graph.vertices(vertex.id())
- : g.V().has(vertex.label(), BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID, vertex.id());
+ : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
return iterator.hasNext() ? iterator.next() : super.getOrCreateVertex(vertex, graph, g);
}