You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/11/05 02:56:44 UTC
[26/50] [abbrv] incubator-tinkerpop git commit: Optimized BLVP
Optimized BLVP
BLVP now uses EventStrategy to monitor what the actual BulkLoader implementation does (e.g. whether it creates a new vertex or just returns an existing one).
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8b8222da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8b8222da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8b8222da
Branch: refs/heads/TINKERPOP3-923
Commit: 8b8222da7fed57fa1e7e0232ab788944716404f7
Parents: 19b7ae0
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Tue Nov 3 15:14:12 2015 +0100
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Tue Nov 3 15:14:12 2015 +0100
----------------------------------------------------------------------
.../computer/bulkloading/BulkLoader.java | 31 +++++
.../bulkloading/BulkLoaderVertexProgram.java | 116 ++++++++++++++++---
.../bulkloading/IncrementalBulkLoader.java | 14 ++-
.../structure/TinkerGraphPlayTest.java | 30 ++---
4 files changed, 158 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
index dd5eaf4..2640d2e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
@@ -41,6 +41,22 @@ public interface BulkLoader {
public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g);
/**
+ * Creates a clone of the given edge between the given in- and out-vertices.
+ *
+ * @param edge The edge to be cloned.
+ * @param outVertex The out-vertex in the given graph..
+ * @param inVertex The in-vertex in the given graph.
+ * @param graph The graph that holds the cloned edge after this method was called.
+ * @param g A standard traversal source for the given graph.
+ * @return The cloned edge.
+ */
+ public default Edge createEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
+ final Edge result = outVertex.addEdge(edge.label(), inVertex);
+ edge.properties().forEachRemaining(property -> result.property(property.key(), property.value()));
+ return result;
+ }
+
+ /**
* Gets or creates a clone of the given edge between the given in- and out-vertices.
*
* @param edge The edge to be cloned.
@@ -53,6 +69,21 @@ public interface BulkLoader {
public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g);
/**
+ * Creates a clone of the given property for the given vertex.
+ *
+ * @param property The property to be cloned.
+ * @param vertex The vertex in the given graph..
+ * @param graph The graph that holds the given vertex.
+ * @param g A standard traversal source for the given graph.
+ * @return The cloned property.
+ */
+ public default VertexProperty createVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
+ final VertexProperty result = vertex.property(property.key(), property.value());
+ property.properties().forEachRemaining(metaProperty -> result.property(metaProperty.key(), metaProperty.value()));
+ return result;
+ }
+
+ /**
* Gets or creates a clone of the given property for the given vertex.
*
* @param property The property to be cloned.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 99bb1f5..2950bfd 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -31,8 +31,12 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.MutationListener;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
@@ -48,7 +52,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
/**
* @author Daniel Kuppitz (http://gremlin.guru)
@@ -74,12 +77,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
private GraphTraversalSource g;
private long intermediateBatchSize;
- private static final ThreadLocal<AtomicLong> counter = new ThreadLocal<AtomicLong>() {
- @Override
- protected AtomicLong initialValue() {
- return new AtomicLong();
- }
- };
+ private BulkLoadingListener listener;
private BulkLoaderVertexProgram() {
messageScope = MessageScope.Local.of(__::inE);
@@ -116,17 +114,19 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
* @param close Whether to close the current graph instance after calling commit() or not.
*/
private void commit(final boolean close) {
- if (!close && (intermediateBatchSize == 0L || counter.get().incrementAndGet() % intermediateBatchSize != 0))
+ if (!close && (intermediateBatchSize == 0L || listener.mutations() < intermediateBatchSize))
return;
if (null != graph) {
if (graph.features().graph().supportsTransactions()) {
- LOGGER.info("Committing transaction on Graph instance: {} [{}]", graph, counter.get().get());
+ LOGGER.info("Committing transaction on Graph instance: {} [{} mutations]", graph, listener.mutations());
try {
graph.tx().commit();
LOGGER.debug("Committed transaction on Graph instance: {}", graph);
+ listener.resetCounter();
} catch (Exception e) {
LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
graph.tx().rollback();
+ listener.resetCounter();
throw e;
}
}
@@ -144,7 +144,6 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
@Override
public void setup(final Memory memory) {
- counter.get().set(0L);
}
@Override
@@ -172,7 +171,8 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
graph = GraphFactory.open(configuration.subset(WRITE_GRAPH_CFG_KEY));
LOGGER.info("Opened Graph instance: {}", graph);
try {
- g = graph.traversal();
+ listener = new BulkLoadingListener();
+ g = GraphTraversalSource.build().with(EventStrategy.build().addListener(listener).create()).create(graph);
} catch (Exception e) {
try {
graph.close();
@@ -205,12 +205,15 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
private void executeInternal(final Vertex sourceVertex, final Messenger<Tuple> messenger, final Memory memory) {
if (memory.isInitialIteration()) {
+ this.listener.resetStats();
// get or create the vertex
final Vertex targetVertex = bulkLoader.getOrCreateVertex(sourceVertex, graph, g);
// write all the properties of the vertex to the newly created vertex
final Iterator<VertexProperty<Object>> vpi = sourceVertex.properties();
- while (vpi.hasNext()) {
- bulkLoader.getOrCreateVertexProperty(vpi.next(), targetVertex, graph, g);
+ if (this.listener.isNewVertex()) {
+ vpi.forEachRemaining(vp -> bulkLoader.createVertexProperty(vp, targetVertex, graph, g));
+ } else {
+ vpi.forEachRemaining(vp -> bulkLoader.getOrCreateVertexProperty(vp, targetVertex, graph, g));
}
this.commit(false);
if (!bulkLoader.useUserSuppliedIds()) {
@@ -221,9 +224,14 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
} else if (memory.getIteration() == 1) {
if (bulkLoader.useUserSuppliedIds()) {
final Vertex outV = bulkLoader.getVertex(sourceVertex, graph, g);
+ final boolean incremental = outV.edges(Direction.OUT).hasNext();
sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
final Vertex inV = bulkLoader.getVertex(edge.inVertex(), graph, g);
- bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+ if (incremental) {
+ bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+ } else {
+ bulkLoader.createEdge(edge, outV, inV, graph, g);
+ }
this.commit(false);
});
} else {
@@ -407,4 +415,84 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
}
};
}
+
+ static class BulkLoadingListener implements MutationListener {
+
+ private long counter;
+ private boolean isNewVertex;
+
+ public BulkLoadingListener() {
+ this.counter = 0L;
+ this.isNewVertex = false;
+ ;
+ }
+
+ public boolean isNewVertex() {
+ return this.isNewVertex;
+ }
+
+ public long mutations() {
+ return this.counter;
+ }
+
+ public void resetStats() {
+ this.isNewVertex = false;
+ }
+
+ public void resetCounter() {
+ this.counter = 0L;
+ }
+
+ @Override
+ public void vertexAdded(final Vertex vertex) {
+ this.isNewVertex = true;
+ this.counter++;
+ }
+
+ @Override
+ public void vertexRemoved(final Vertex vertex) {
+ this.counter++;
+ }
+
+ @Override
+ public void vertexPropertyChanged(final Vertex element, final Property oldValue, final Object setValue,
+ final Object... vertexPropertyKeyValues) {
+ this.counter++;
+ }
+
+ @Override
+ public void vertexPropertyRemoved(final VertexProperty vertexProperty) {
+ this.counter++;
+ }
+
+ @Override
+ public void edgeAdded(final Edge edge) {
+ this.counter++;
+ }
+
+ @Override
+ public void edgeRemoved(final Edge edge) {
+ this.counter++;
+ }
+
+ @Override
+ public void edgePropertyChanged(final Edge element, final Property oldValue, final Object setValue) {
+ this.counter++;
+ }
+
+ @Override
+ public void edgePropertyRemoved(final Edge element, final Property property) {
+ this.counter++;
+ }
+
+ @Override
+ public void vertexPropertyPropertyChanged(final VertexProperty element, final Property oldValue, final Object setValue) {
+ this.counter++;
+ }
+
+ @Override
+ public void vertexPropertyPropertyRemoved(final VertexProperty element, final Property property) {
+ this.counter++;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
index f03cd18..8219515 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
@@ -51,8 +51,10 @@ public class IncrementalBulkLoader implements BulkLoader {
return iterator.hasNext()
? iterator.next()
: useUserSuppliedIds()
- ? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
- : graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
+ ? g.addV(vertex.label()).property(T.id, vertex.id()).next()
+ : g.addV(vertex.label()).property(getVertexIdProperty(), vertex.id()).next();
+ //? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
+ //: graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
}
/**
@@ -71,8 +73,7 @@ public class IncrementalBulkLoader implements BulkLoader {
}
});
} else {
- e = outVertex.addEdge(edge.label(), inVertex);
- edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+ e = createEdge(edge, outVertex, inVertex, graph, g);
}
return e;
}
@@ -84,7 +85,10 @@ public class IncrementalBulkLoader implements BulkLoader {
public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
final VertexProperty<?> vp;
final VertexProperty<?> existing = vertex.property(property.key());
- if (!existing.isPresent() || !existing.value().equals(property.value())) {
+ if (!existing.isPresent()) {
+ return createVertexProperty(property, vertex, graph, g);
+ }
+ if (!existing.value().equals(property.value())) {
vp = vertex.property(property.key(), property.value());
} else {
vp = existing;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b8222da/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
index 06994e2..e7c54ce 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraphPlayTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.tinkergraph.structure;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
@@ -30,10 +31,12 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.graphml.GraphMLIo;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
@@ -192,20 +195,19 @@ public class TinkerGraphPlayTest {
@Test
@Ignore
public void testPlayDK() throws Exception {
- final Graph graph = TinkerFactory.createModern();
- final GraphTraversalSource g = graph.traversal();
- Traversal traversal = g.V().where(out().and().in()).profile().cap(TraversalMetrics.METRICS_KEY);
- //traversal.forEachRemaining(System.out::println);
- System.out.println(traversal.toString());
- traversal.asAdmin().applyStrategies();
- System.out.println(traversal.toString());
- traversal.forEachRemaining(System.out::println);
- traversal = g.V().where(and(out(), in())).profile().cap(TraversalMetrics.METRICS_KEY);
- //traversal.forEachRemaining(System.out::println);
- System.out.println(traversal.toString());
- traversal.asAdmin().applyStrategies();
- System.out.println(traversal.toString());
- //System.out.println(traversal.toString());
+
+ new File("/tmp/tinkergraph2.kryo").deleteOnExit();
+ new File("/tmp/tinkergraph3.kryo").deleteOnExit();
+
+ final Graph graph1 = TinkerFactory.createModern();
+ final Graph graph2 = GraphFactory.open("/tmp/graph2.properties");
+ TinkerFactory.generateModern((TinkerGraph) graph2);
+ graph2.close();
+
+ System.out.println("graph1 -> graph2");
+ graph1.compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph("/tmp/graph2.properties").create(graph1)).submit().get();
+ System.out.println("graph1 -> graph3");
+ graph1.compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph("/tmp/graph3.properties").create(graph1)).submit().get();
}
@Test