You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/09/01 16:26:28 UTC
[12/19] incubator-tinkerpop git commit: Big cleanup after testing
failure scenarios.
Big cleanup after testing failure scenarios.
Got rid of DefaultBulkLoader. Renamed IncrementalBulkLoader to DefaultBulkLoader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9f467c4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9f467c4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9f467c4c
Branch: refs/heads/tp30
Commit: 9f467c4cf8c78d28bbf914e9465c045be3b558ce
Parents: 4978e33
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Sat Aug 29 03:17:20 2015 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Sat Aug 29 03:17:20 2015 +0200
----------------------------------------------------------------------
.../computer/bulkloading/BulkLoader.java | 6 +-
.../bulkloading/BulkLoaderVertexProgram.java | 39 ++++---
.../computer/bulkloading/DefaultBulkLoader.java | 70 ++++++++----
.../bulkloading/IncrementalBulkLoader.java | 112 -------------------
4 files changed, 76 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
index 5619971..dd5eaf4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoader.java
@@ -95,11 +95,9 @@ public interface BulkLoader {
public boolean useUserSuppliedIds();
/**
- * @return Whether original vertex identifiers are stored in the target graph or not.
+ * @return Whether to keep the original vertex identifiers in the target graph or not.
*/
- public default boolean storeOriginalIds() {
- return !useUserSuppliedIds();
- }
+ public boolean keepOriginalIds();
/**
* @return The name of the vertex property that is used to store the original vertex id.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 2f36a6e..b8219fb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -54,10 +54,10 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkLoaderVertexProgram.class);
- public static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.bulkLoading";
- private static final String GRAPH_CFG_KEY = "graph";
- private static final String BULK_LOADER_CFG_KEY = "loader";
- private static final String INTERMEDIATE_BATCH_SIZE_CFG_KEY = "intermediateBatchSize";
+ public static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.bulkLoaderVertexProgram";
+ public static final String GRAPH_CFG_KEY = "graph";
+ public static final String BULK_LOADER_CFG_KEY = "loader";
+ public static final String INTERMEDIATE_BATCH_SIZE_CFG_KEY = "intermediateBatchSize";
public static final String BULK_LOADER_VERTEX_ID_CFG_KEY = "vertexIdProperty";
public static final String DEFAULT_BULK_LOADER_VERTEX_ID = "bulkLoader.vertex.id";
@@ -117,9 +117,8 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
if (graph.features().graph().supportsTransactions()) {
LOGGER.info("Committing transaction on Graph instance: {}", graph);
try {
- graph.tx().commit(); // TODO will Giraph/MR restart the program and re-run execute if this fails?
+ graph.tx().commit();
LOGGER.debug("Committed transaction on Graph instance: {}", graph);
- counter = 0L;
} catch (Exception e) {
LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
graph.tx().rollback();
@@ -140,7 +139,6 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
@Override
public void setup(final Memory memory) {
-
}
@Override
@@ -156,6 +154,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
}
intermediateBatchSize = configuration.getLong(INTERMEDIATE_BATCH_SIZE_CFG_KEY, 0L);
elementComputeKeys.add(configuration.subset(BULK_LOADER_CFG_KEY).getString(BULK_LOADER_VERTEX_ID_CFG_KEY));
+ bulkLoader = createBulkLoader();
}
@Override
@@ -176,7 +175,6 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
throw new IllegalStateException("The given graph instance does not allow concurrent access.");
}
g = graph.traversal();
- bulkLoader = createBulkLoader();
} catch (Exception e) {
try {
graph.close();
@@ -222,12 +220,13 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
sourceVertex.property(bulkLoader.getVertexIdProperty(), targetVertex.id());
messenger.sendMessage(messageScope, Pair.with(sourceVertex.id(), targetVertex.id()));
}
- } else {
+ } else if (memory.getIteration() == 1) {
if (bulkLoader.useUserSuppliedIds()) {
final Vertex outV = bulkLoader.getVertex(sourceVertex, graph, g);
sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
final Vertex inV = bulkLoader.getVertex(edge.inVertex(), graph, g);
bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
+ this.commit(false);
});
} else {
// create an id map and populate it with all the incoming messages
@@ -237,7 +236,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
final Tuple idPair = idi.next();
idPairs.put(idPair.getValue(0), idPair.getValue(1));
}
- // get the vertex given the dummy id property
+ // get the vertex with given the dummy id property
final Long outVId = sourceVertex.value(bulkLoader.getVertexIdProperty());
final Vertex outV = bulkLoader.getVertexById(outVId, graph, g);
// for all the incoming edges of the vertex, get the incoming adjacent vertex and write the edge and its properties
@@ -248,12 +247,23 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
this.commit(false);
});
}
+ } else if (memory.getIteration() == 2) {
+ final Long vertexId = sourceVertex.value(bulkLoader.getVertexIdProperty());
+ bulkLoader.getVertexById(vertexId, graph, g)
+ .property(bulkLoader.getVertexIdProperty()).remove();
+ this.commit(false);
}
}
@Override
public boolean terminate(final Memory memory) {
- return memory.getIteration() >= 1;
+ switch (memory.getIteration()) {
+ case 1:
+ return bulkLoader.keepOriginalIds();
+ case 2:
+ return true;
+ }
+ return false;
}
@Override
@@ -266,6 +276,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
return Collections.singleton(messageScope);
}
+ @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "CloneDoesntCallSuperClone"})
@Override
public VertexProgram<Tuple> clone() {
return this;
@@ -273,17 +284,17 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
- return GraphComputer.ResultGraph.NEW;
+ return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
- return GraphComputer.Persist.EDGES;
+ return GraphComputer.Persist.NOTHING;
}
@Override
public String toString() {
- return StringFactory.vertexProgramString(this, "");
+ return StringFactory.vertexProgramString(this, bulkLoader != null ? bulkLoader.getClass().getSimpleName() : null);
}
public static Builder build() {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
index 029d379..75d3494 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/DefaultBulkLoader.java
@@ -19,38 +19,43 @@
package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import java.util.Iterator;
+
/**
* @author Daniel Kuppitz (http://gremlin.guru)
*/
public class DefaultBulkLoader implements BulkLoader {
public final static String USER_SUPPLIED_IDS_CFG_KEY = "userSuppliedIds";
- public final static String STORE_ORIGINAL_IDS_CFG_KEY = "storeOriginalIds";
+ public final static String KEEP_ORIGINAL_IDS_CFG_KEY = "keepOriginalIds";
private String bulkLoaderVertexId = BulkLoaderVertexProgram.DEFAULT_BULK_LOADER_VERTEX_ID;
- private boolean storeOriginalIds = false;
- private boolean useUserSuppliedIds = false;
+ private boolean keepOriginalIds = true;
+ private boolean userSuppliedIds = false;
/**
* {@inheritDoc}
*/
@Override
public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
- if (useUserSuppliedIds()) {
- return graph.addVertex(T.id, vertex.id(), T.label, vertex.label());
- }
- final Vertex v = graph.addVertex(T.label, vertex.label());
- if (storeOriginalIds()) {
- v.property(bulkLoaderVertexId, vertex.id());
- }
- return v;
+ final Iterator<Vertex> iterator = useUserSuppliedIds()
+ ? graph.vertices(vertex.id())
+ : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
+ return iterator.hasNext()
+ ? iterator.next()
+ : useUserSuppliedIds()
+ ? graph.addVertex(T.id, vertex.id(), T.label, vertex.label())
+ : graph.addVertex(T.label, vertex.label(), getVertexIdProperty(), vertex.id());
}
/**
@@ -58,8 +63,20 @@ public class DefaultBulkLoader implements BulkLoader {
*/
@Override
public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
- final Edge e = outVertex.addEdge(edge.label(), inVertex);
- edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+ final Edge e;
+ final Traversal<Vertex, Edge> t = g.V(outVertex).outE(edge.label()).filter(__.inV().is(inVertex));
+ if (t.hasNext()) {
+ e = t.next();
+ edge.properties().forEachRemaining(property -> {
+ final Property<?> existing = e.property(property.key());
+ if (!existing.isPresent() || !existing.value().equals(property.value())) {
+ e.property(property.key(), property.value());
+ }
+ });
+ } else {
+ e = outVertex.addEdge(edge.label(), inVertex);
+ edge.properties().forEachRemaining(property -> e.property(property.key(), property.value()));
+ }
return e;
}
@@ -68,8 +85,19 @@ public class DefaultBulkLoader implements BulkLoader {
*/
@Override
public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
- final VertexProperty<?> vp = vertex.property(property.key(), property.value());
- vp.properties().forEachRemaining(metaProperty -> property.property(metaProperty.key(), metaProperty.value()));
+ final VertexProperty<?> vp;
+ final VertexProperty<?> existing = vertex.property(property.key());
+ if (!existing.isPresent() || !existing.value().equals(property.value())) {
+ vp = vertex.property(property.key(), property.value());
+ } else {
+ vp = existing;
+ }
+ property.properties().forEachRemaining(metaProperty -> {
+ final Property<?> existing2 = vp.property(metaProperty.key());
+ if (!existing2.isPresent() || !existing2.value().equals(metaProperty.value())) {
+ vp.property(metaProperty.key(), metaProperty.value());
+ }
+ });
return vp;
}
@@ -88,15 +116,15 @@ public class DefaultBulkLoader implements BulkLoader {
*/
@Override
public boolean useUserSuppliedIds() {
- return useUserSuppliedIds;
+ return userSuppliedIds;
}
/**
* {@inheritDoc}
*/
@Override
- public boolean storeOriginalIds() {
- return storeOriginalIds;
+ public boolean keepOriginalIds() {
+ return keepOriginalIds;
}
/**
@@ -116,10 +144,10 @@ public class DefaultBulkLoader implements BulkLoader {
bulkLoaderVertexId = configuration.getString(BulkLoaderVertexProgram.BULK_LOADER_VERTEX_ID_CFG_KEY);
}
if (configuration.containsKey(USER_SUPPLIED_IDS_CFG_KEY)) {
- useUserSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
+ userSuppliedIds = configuration.getBoolean(USER_SUPPLIED_IDS_CFG_KEY);
}
- if (configuration.containsKey(STORE_ORIGINAL_IDS_CFG_KEY)) {
- storeOriginalIds = configuration.getBoolean(STORE_ORIGINAL_IDS_CFG_KEY);
+ if (configuration.containsKey(KEEP_ORIGINAL_IDS_CFG_KEY)) {
+ keepOriginalIds = configuration.getBoolean(KEEP_ORIGINAL_IDS_CFG_KEY);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9f467c4c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
deleted file mode 100644
index 7822c2d..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/IncrementalBulkLoader.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-
-/**
- * @author Daniel Kuppitz (http://gremlin.guru)
- */
-public class IncrementalBulkLoader extends DefaultBulkLoader {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalBulkLoader.class);
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Vertex getOrCreateVertex(final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
- final Iterator<Vertex> iterator = useUserSuppliedIds()
- ? graph.vertices(vertex.id())
- : g.V().has(vertex.label(), getVertexIdProperty(), vertex.id());
- return iterator.hasNext() ? iterator.next() : super.getOrCreateVertex(vertex, graph, g);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Edge getOrCreateEdge(final Edge edge, final Vertex outVertex, final Vertex inVertex, final Graph graph, final GraphTraversalSource g) {
- final Traversal<Vertex, Edge> t = g.V(outVertex).outE(edge.label()).filter(__.inV().is(inVertex));
- if (t.hasNext()) {
- final Edge e = t.next();
- edge.properties().forEachRemaining(property -> {
- final Property<?> existing = e.property(property.key());
- if (!existing.isPresent() || !existing.value().equals(property.value())) {
- e.property(property.key(), property.value());
- }
- });
- return e;
- }
- return super.getOrCreateEdge(edge, outVertex, inVertex, graph, g);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public VertexProperty getOrCreateVertexProperty(final VertexProperty<?> property, final Vertex vertex, final Graph graph, final GraphTraversalSource g) {
- final VertexProperty<?> vp;
- final VertexProperty<?> existing = vertex.property(property.key());
- if (!existing.isPresent() || !existing.value().equals(property.value())) {
- vp = vertex.property(property.key(), property.value());
- } else {
- vp = existing;
- }
- property.properties().forEachRemaining(metaProperty -> {
- final Property<?> existing2 = vp.property(metaProperty.key());
- if (!existing2.isPresent() || !existing2.value().equals(metaProperty.value())) {
- vp.property(metaProperty.key(), metaProperty.value());
- }
- });
- return vp;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean storeOriginalIds() {
- return !useUserSuppliedIds();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void configure(final Configuration configuration) {
- super.configure(configuration);
- if (configuration.containsKey(STORE_ORIGINAL_IDS_CFG_KEY)) {
- LOGGER.warn("{} automatically determines whether original identifiers should be stored or not, hence the " +
- "configuration setting '{}' will be ignored.", this.getClass(), STORE_ORIGINAL_IDS_CFG_KEY);
- }
- }
-}