You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/22 17:50:18 UTC
incubator-tinkerpop git commit: added
GraphComputerTest.shouldSupportTransientKeys(). Ensured that only
Memory.set() is allowed in setup()/terminate() and Memory.add() in execute().
Fixed up SparkGraphComputer, TinkerGraphComputer, and GiraphSparkCompute
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1166 7bf8213a5 -> 706759c1d
added GraphComputerTest.shouldSupportTransientKeys(). Ensured that only Memory.set() is allowed in setup()/terminate() and Memory.add() in execute(). Fixed up SparkGraphComputer, TinkerGraphComputer, and GiraphSparkComputer to respect the new MemoryComputeKey semantics and transient key semantics.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/706759c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/706759c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/706759c1
Branch: refs/heads/TINKERPOP-1166
Commit: 706759c1dbf9df6bf9210913001658ca9f9ff513
Parents: 7bf8213
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Feb 22 09:50:06 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Feb 22 09:50:06 2016 -0700
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 2 +-
.../giraph/process/computer/GiraphMemory.java | 25 ++-
.../giraph/structure/io/GiraphVertexWriter.java | 11 ++
.../gremlin/process/computer/Memory.java | 10 +-
.../process/computer/VertexComputeKey.java | 7 +-
.../traversal/TraversalVertexProgram.java | 13 +-
.../computer/util/VertexProgramHelper.java | 8 +
.../process/computer/GraphComputerTest.java | 187 +++++++++++++++++--
.../pagerank/PageRankVertexProgramTest.java | 6 +-
.../spark/process/computer/SparkExecutor.java | 12 +-
.../process/computer/SparkGraphComputer.java | 5 +-
.../spark/process/computer/SparkMemory.java | 9 +-
.../process/computer/TinkerGraphComputer.java | 12 +-
.../computer/TinkerGraphComputerView.java | 9 +-
.../process/computer/TinkerMemory.java | 15 +-
15 files changed, 271 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 83eb436..f3c1624 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -199,7 +199,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs"); // how do I get the exception that occured?
// add vertex program memory values to the return memory
for (final MemoryComputeKey memoryKey : this.vertexProgram.getMemoryComputeKeys()) {
- if (storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()))) {
+ if (!memoryKey.isTransient() && storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()))) {
final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey())));
if (iterator.hasNext()) {
this.memory.set(memoryKey.getKey(), iterator.next().getValue());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
index f453b5a..bc7cc04 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
@@ -100,10 +100,13 @@ public final class GiraphMemory extends MasterCompute implements Memory {
if (null != outputLocation) {
try {
for (final String key : this.keys()) {
- final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + key), ObjectWritable.class, ObjectWritable.class);
- writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.get(key)));
- writer.close();
+ if (!this.memoryKeys.get(key).isTransient()) { // do not write transient memory keys to disk
+ final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + key), ObjectWritable.class, ObjectWritable.class);
+ writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.get(key)));
+ writer.close();
+ }
}
+ // written for GiraphGraphComputer to read and then is deleted by GiraphGraphComputer
final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + Constants.HIDDEN_ITERATION), ObjectWritable.class, ObjectWritable.class);
writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.getIteration()));
writer.close();
@@ -157,21 +160,17 @@ public final class GiraphMemory extends MasterCompute implements Memory {
@Override
public void set(final String key, final Object value) {
this.checkKeyValue(key, value);
- if (this.isMasterCompute) { // only called on setup() and terminate()
- this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
- } else {
- this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
- }
+ if (!this.isMasterCompute) // only called on setup() and terminate()
+ throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
+ this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
}
@Override
public void add(final String key, final Object value) {
this.checkKeyValue(key, value);
- if (this.isMasterCompute) { // only called on setup() and terminate()
- this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
- } else {
- this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
- }
+ if (this.isMasterCompute)
+ throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
+ this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
index 9bf5c13..e1e946d 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
@@ -29,14 +29,21 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
import java.io.IOException;
+import java.util.stream.Collectors;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class GiraphVertexWriter extends VertexWriter {
private RecordWriter<NullWritable, VertexWritable> recordWriter;
+ private String[] transientComputeKeys;
public GiraphVertexWriter() {
@@ -46,6 +53,9 @@ public final class GiraphVertexWriter extends VertexWriter {
public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
this.recordWriter = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration).getRecordWriter(context);
+ this.transientComputeKeys = VertexProgramHelper.vertexComputeKeysAsArray(((VertexProgram<?>) VertexProgram.createVertexProgram(EmptyGraph.instance(), ConfUtil.makeApacheConfiguration(configuration))).getVertexComputeKeys().stream().
+ filter(VertexComputeKey::isTransient).
+ collect(Collectors.toSet()));
}
@Override
@@ -55,6 +65,7 @@ public final class GiraphVertexWriter extends VertexWriter {
@Override
public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
+ ((GiraphVertex) vertex).getValue().get().dropVertexProperties(this.transientComputeKeys); // remove all transient compute keys before writing to OutputFormat
this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
index 16dbb78..fdae56e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
@@ -71,7 +71,7 @@ public interface Memory {
* @param key they key to set a value for
* @param value the value to set for the key
*/
- public void add(final String key, final Object value);
+ public void add(final String key, final Object value) throws IllegalArgumentException, IllegalStateException;
/**
* A helper method that generates a {@link Map} of the memory key/values.
@@ -153,6 +153,14 @@ public interface Memory {
return new IllegalArgumentException("The memory does not have a value for provided key: " + key);
}
+ public static IllegalArgumentException memorySetOnlyDuringVertexProgramSetUpAndTerminate(final String key) {
+ return new IllegalArgumentException("The memory can only be set() during vertex program setup and terminate: " + key);
+ }
+
+ public static IllegalArgumentException memoryAddOnlyDuringVertexProgramExecute(final String key) {
+ return new IllegalArgumentException("The memory can only be add() during vertex program execute: " + key);
+ }
+
public static UnsupportedOperationException dataTypeOfMemoryValueNotSupported(final Object val) {
return new UnsupportedOperationException(String.format("Graph computer memory value [%s] is of type %s is not supported", val, val.getClass()));
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java
index 3b106b7..43ba917 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java
@@ -19,10 +19,14 @@
package org.apache.tinkerpop.gremlin.process.computer;
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+
+import java.io.Serializable;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class VertexComputeKey {
+public final class VertexComputeKey implements Serializable {
private final String key;
private final boolean isTransient;
@@ -30,6 +34,7 @@ public final class VertexComputeKey {
private VertexComputeKey(final String key, final boolean isTransient) {
this.key = key;
this.isTransient = isTransient;
+ ElementHelper.validateProperty(key, key);
}
public String getKey() {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 29ff45e..ad71f8a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -80,12 +80,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
// TODO: if not an adjacent traversal, use Local message scope -- a dual messaging system.
private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
- private static final Set<VertexComputeKey> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(VertexComputeKey.of(HALTED_TRAVERSERS, false), VertexComputeKey.of(TraversalSideEffects.SIDE_EFFECTS, true)));
- private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true)));
+ private static final Set<VertexComputeKey> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
+ VertexComputeKey.of(HALTED_TRAVERSERS, false),
+ VertexComputeKey.of(TraversalSideEffects.SIDE_EFFECTS, false)));
private PureTraversal<?, ?> traversal;
private TraversalMatrix<?, ?> traversalMatrix;
-
private final Set<MapReduce> mapReducers = new HashSet<>();
private TraversalVertexProgram() {
@@ -186,7 +186,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
- return MEMORY_COMPUTE_KEYS;
+ final Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
+ memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
+ for (final MapReduce mapReduce : this.mapReducers) {
+ memoryComputeKeys.add(MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
+ }
+ return memoryComputeKeys;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
index a0b0c25..bc67866 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
@@ -48,6 +48,14 @@ public final class VertexProgramHelper {
return set;
}
+ public static boolean isTransientVertexComputeKey(final String key, final Set<VertexComputeKey> vertexComputeKeySet) {
+ for (final VertexComputeKey vertexComputeKey : vertexComputeKeySet) {
+ if (vertexComputeKey.getKey().equals(key))
+ return vertexComputeKey.isTransient();
+ }
+ throw new IllegalArgumentException("Could not find key in vertex compute key set: " + key);
+ }
+
public static String[] vertexComputeKeysAsArray(final Set<VertexComputeKey> vertexComputeKeySet) {
return VertexProgramHelper.vertexComputeKeysAsSet(vertexComputeKeySet).toArray(new String[vertexComputeKeySet.size()]);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 9c087a9..097284b 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -272,7 +272,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
- return Collections.singleton(null);
+ return Collections.singleton(MemoryComputeKey.of(null, MemoryComputeKey.orOperator(), false));
}
@Override
@@ -571,10 +571,16 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
memory.set("d", false);
memory.set("e", true);
memory.set("f", memory.getIteration());
+ try {
+ memory.add("a", 0l);
+ fail("Should only allow Memory.set() during VertexProgram.setup()");
+ } catch (final Exception e) {
+ validateException(Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute("a"), e);
+ }
}
@Override
- public void execute(Vertex vertex, Messenger messenger, Memory memory) {
+ public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
// test current step values
assertEquals(Long.valueOf(6 * memory.getIteration()), memory.get("a"));
assertEquals(Long.valueOf(0), memory.get("b"));
@@ -608,6 +614,12 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
}
assertTrue(memory.get("e"));
assertEquals(memory.getIteration(), memory.<Integer>get("f").intValue());
+ try {
+ memory.set("a", 0l);
+ fail("Should only allow Memory.add() during VertexProgram.execute()");
+ } catch (final Exception e) {
+ validateException(Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate("a"), e);
+ }
}
@Override
@@ -620,6 +632,12 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
assertEquals(memory.getIteration() + 1, memory.<Integer>get("f").intValue());
memory.set("b", 0l);
memory.set("e", true);
+ try {
+ memory.add("a", 0l);
+ fail("Should only allow Memory.set() during VertexProgram.terminate()");
+ } catch (final Exception e) {
+ validateException(Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute("a"), e);
+ }
return memory.getIteration() > 1;
}
@@ -1102,9 +1120,9 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void workerIterationStart(final Memory memory) {
- assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
+ assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
try {
- memory.set("test", memory.getIteration());
+ memory.add("test", memory.getIteration());
fail("Should throw an immutable memory exception");
} catch (IllegalStateException e) {
assertEquals(Memory.Exceptions.memoryIsCurrentlyImmutable().getMessage(), e.getMessage());
@@ -1113,8 +1131,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void execute(Vertex vertex, Messenger messenger, Memory memory) {
- assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
- memory.set("test", memory.getIteration() + 1);
+ assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
+ memory.add("test", 1);
}
@Override
@@ -1124,7 +1142,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void workerIterationEnd(final Memory memory) {
- assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
+ assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
try {
memory.set("test", memory.getIteration());
fail("Should throw an immutable memory exception");
@@ -1135,7 +1153,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
- return Collections.singleton(MemoryComputeKey.of("test", MemoryComputeKey.setOperator(), false));
+ return Collections.singleton(MemoryComputeKey.of("test", MemoryComputeKey.sumIntegerOperator(), false));
}
@Override
@@ -1848,7 +1866,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
assertEquals(6, graph1.traversal().V().count().next().intValue());
assertEquals(6, graph1.traversal().E().count().next().intValue());
assertEquals(6, graph1.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
- assertEquals(6, graph1.traversal().V().values(PageRankVertexProgram.EDGE_COUNT).count().next().intValue());
+ assertEquals(0, graph1.traversal().V().values(PageRankVertexProgram.EDGE_COUNT).count().next().intValue());
//
final ComputerResult result2 = graph1.compute(graphProvider.getGraphComputer(graph1).getClass())
.program(PeerPressureVertexProgram.build().maxIterations(4).create(graph1)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
@@ -1858,9 +1876,9 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
assertEquals(6, graph2.traversal().V().count().next().intValue());
assertEquals(6, graph2.traversal().E().count().next().intValue());
assertEquals(6, graph2.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
- assertEquals(6, graph2.traversal().V().values(PeerPressureVertexProgram.VOTE_STRENGTH).count().next().intValue());
+ assertEquals(0, graph2.traversal().V().values(PeerPressureVertexProgram.VOTE_STRENGTH).count().next().intValue());
assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
- assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.EDGE_COUNT).count().next().intValue());
+ assertEquals(0, graph2.traversal().V().values(PageRankVertexProgram.EDGE_COUNT).count().next().intValue());
//
final ComputerResult result3 = graph2.compute(graphProvider.getGraphComputer(graph2).getClass())
.program(TraversalVertexProgram.build().traversal(g.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
@@ -1878,9 +1896,9 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
assertEquals(6, graph3.traversal().E().count().next().intValue());
assertEquals(6, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
- assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.VOTE_STRENGTH).count().next().intValue());
+ assertEquals(0, graph3.traversal().V().values(PeerPressureVertexProgram.VOTE_STRENGTH).count().next().intValue());
assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
- assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.EDGE_COUNT).count().next().intValue());
+ assertEquals(0, graph3.traversal().V().values(PageRankVertexProgram.EDGE_COUNT).count().next().intValue());
// TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram off of the PeerPressureVertexProgram job.
}
@@ -1945,4 +1963,147 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
}
+
+ ///////////////////////////////////
+
+ @Test
+ @LoadGraphWith(MODERN)
+ public void shouldSupportTransientKeys() throws Exception {
+ final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramO()).submit().get();
+ result.graph().vertices().forEachRemaining(vertex -> {
+ assertFalse(vertex.property("v1").isPresent());
+ assertFalse(vertex.property("v2").isPresent());
+ assertTrue(vertex.property("v3").isPresent());
+ assertEquals("shouldExist", vertex.value("v3"));
+ assertTrue(vertex.property("name").isPresent());
+ if (vertex.label().equals("software"))
+ assertTrue(vertex.property("lang").isPresent());
+ else
+ assertTrue(vertex.property("age").isPresent());
+ assertEquals(3, IteratorUtils.count(vertex.properties()));
+ assertEquals(0, IteratorUtils.count(vertex.properties("v1")));
+ assertEquals(0, IteratorUtils.count(vertex.properties("v2")));
+ assertEquals(1, IteratorUtils.count(vertex.properties("v3")));
+ assertEquals(1, IteratorUtils.count(vertex.properties("name")));
+ });
+ assertEquals(6l, result.graph().traversal().V().properties("name").count().next().longValue());
+ assertEquals(0l, result.graph().traversal().V().properties("v1").count().next().longValue());
+ assertEquals(0l, result.graph().traversal().V().properties("v2").count().next().longValue());
+ assertEquals(6l, result.graph().traversal().V().properties("v3").count().next().longValue());
+ assertEquals(6l, result.graph().traversal().V().<String>values("name").dedup().count().next().longValue());
+ assertEquals(1l, result.graph().traversal().V().<String>values("v3").dedup().count().next().longValue());
+ assertEquals("shouldExist", result.graph().traversal().V().<String>values("v3").dedup().next());
+ ///
+ assertFalse(result.memory().exists("m1"));
+ assertFalse(result.memory().exists("m2"));
+ assertTrue(result.memory().exists("m3"));
+ assertEquals(24l, result.memory().<Long>get("m3").longValue());
+ assertEquals(1, result.memory().keys().size());
+ }
+
+ private static class VertexProgramO extends StaticVertexProgram {
+
+ @Override
+ public void setup(final Memory memory) {
+ assertFalse(memory.exists("m1"));
+ assertFalse(memory.exists("m2"));
+ assertFalse(memory.exists("m3"));
+ }
+
+ @Override
+ public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+ if (memory.isInitialIteration()) {
+ assertFalse(vertex.property("v1").isPresent());
+ assertFalse(vertex.property("v2").isPresent());
+ assertFalse(vertex.property("v3").isPresent());
+ vertex.property("v1", "shouldNotExist");
+ vertex.property("v2", "shouldNotExist");
+ vertex.property("v3", "shouldExist");
+ assertTrue(vertex.property("v1").isPresent());
+ assertTrue(vertex.property("v2").isPresent());
+ assertTrue(vertex.property("v3").isPresent());
+ assertEquals("shouldNotExist", vertex.value("v1"));
+ assertEquals("shouldNotExist", vertex.value("v2"));
+ assertEquals("shouldExist", vertex.value("v3"));
+ //
+ assertFalse(memory.exists("m1"));
+ assertFalse(memory.exists("m2"));
+ assertFalse(memory.exists("m3"));
+ memory.add("m1", false);
+ memory.add("m2", true);
+ memory.add("m3", 2l);
+ // should still not exist as this pulls from the master memory
+ assertFalse(memory.exists("m1"));
+ assertFalse(memory.exists("m2"));
+ assertFalse(memory.exists("m3"));
+
+ } else {
+ assertTrue(vertex.property("v1").isPresent());
+ assertTrue(vertex.property("v2").isPresent());
+ assertTrue(vertex.property("v3").isPresent());
+ assertEquals("shouldNotExist", vertex.value("v1"));
+ assertEquals("shouldNotExist", vertex.value("v2"));
+ assertEquals("shouldExist", vertex.value("v3"));
+ //
+ assertTrue(memory.exists("m1"));
+ assertTrue(memory.exists("m2"));
+ assertTrue(memory.exists("m3"));
+ assertFalse(memory.get("m1"));
+ assertTrue(memory.get("m2"));
+ assertEquals(12l, memory.<Long>get("m3").longValue());
+ memory.add("m1", true);
+ memory.add("m2", true);
+ memory.add("m3", 2l);
+ }
+ }
+
+ @Override
+ public boolean terminate(final Memory memory) {
+ assertTrue(memory.exists("m1"));
+ assertTrue(memory.exists("m2"));
+ assertTrue(memory.exists("m3"));
+ if (memory.isInitialIteration()) {
+ assertFalse(memory.get("m1"));
+ assertTrue(memory.get("m2"));
+ assertEquals(12l, memory.<Long>get("m3").longValue());
+ return false;
+ } else {
+ assertTrue(memory.get("m1"));
+ assertTrue(memory.get("m2"));
+ assertEquals(24l, memory.<Long>get("m3").longValue());
+ return true;
+ }
+ }
+
+ @Override
+ public Set<MessageScope> getMessageScopes(final Memory memory) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<MemoryComputeKey> getMemoryComputeKeys() {
+ return new HashSet<>(Arrays.asList(
+ MemoryComputeKey.of("m1", MemoryComputeKey.orOperator(), true),
+ MemoryComputeKey.of("m2", MemoryComputeKey.andOperator(), true),
+ MemoryComputeKey.of("m3", MemoryComputeKey.sumLongOperator(), false)));
+ }
+
+ @Override
+ public Set<VertexComputeKey> getVertexComputeKeys() {
+ return new HashSet<>(Arrays.asList(
+ VertexComputeKey.of("v1", true),
+ VertexComputeKey.of("v2", true),
+ VertexComputeKey.of("v3", false)));
+ }
+
+ @Override
+ public GraphComputer.ResultGraph getPreferredResultGraph() {
+ return GraphComputer.ResultGraph.NEW;
+ }
+
+ @Override
+ public GraphComputer.Persist getPreferredPersist() {
+ return GraphComputer.Persist.VERTEX_PROPERTIES;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgramTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgramTest.java
index c74cb0c..cd89fa9 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgramTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgramTest.java
@@ -22,12 +22,12 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.Test;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@@ -41,9 +41,9 @@ public class PageRankVertexProgramTest extends AbstractGremlinProcessTest {
if (graphProvider.getGraphComputer(graph).features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.VERTEX_PROPERTIES)) {
final ComputerResult result = graph.compute(graphProvider.getGraphComputer(graph).getClass()).program(PageRankVertexProgram.build().create(graph)).submit().get();
result.graph().traversal().V().forEachRemaining(v -> {
- assertEquals(4, v.keys().size()); // name, age/lang, edgeCount, pageRank
+ assertEquals(3, v.keys().size()); // name, age/lang, pageRank
assertTrue(v.keys().contains("name"));
- assertTrue(v.keys().contains(PageRankVertexProgram.EDGE_COUNT));
+ assertFalse(v.keys().contains(PageRankVertexProgram.EDGE_COUNT));
assertTrue(v.keys().contains(PageRankVertexProgram.PAGE_RANK));
assertEquals(1, IteratorUtils.count(v.values("name")));
assertEquals(1, IteratorUtils.count(v.values(PageRankVertexProgram.PAGE_RANK)));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 456c81b..3fd9ec0 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
@@ -45,6 +46,7 @@ import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -154,17 +156,19 @@ public final class SparkExecutor {
return newViewIncomingRDD;
}
- public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
+ public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final Set<VertexComputeKey> vertexComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
// attach the final computed view to the cached graph
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
- vertex.dropVertexProperties(elementComputeKeys);
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
- view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
- // view.clear(); // no longer needed so kill it from memory
+ for (final DetachedVertexProperty<Object> property : view) {
+ vertex.dropVertexProperties(property.key());
+ if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
+ property.attach(Attachable.Method.create(vertex));
+ }
return tuple._1();
});
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index dee6ced..e368aa6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -51,7 +51,6 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
@@ -251,9 +250,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
memory.broadcastMemory(sparkContext);
}
}
+ memory.complete(); // drop all transient memory keys
// write the computed graph to the respective output (rdd or output format)
- final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : VertexProgramHelper.vertexComputeKeysAsArray(this.vertexProgram.getVertexComputeKeys());
- computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, elementComputeKeys);
+ computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) && !this.persist.equals(Persist.NOTHING)) {
outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
index 405c9f3..8262513 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -62,7 +62,6 @@ public final class SparkMemory implements Memory.Admin, Serializable {
}
for (final MemoryComputeKey key : this.memoryKeys.values()) {
this.memory.put(key.getKey(), sparkContext.accumulator(null, key.getKey(), new MemoryAccumulator<>(key)));
-
}
this.broadcast = sparkContext.broadcast(new HashMap<>());
}
@@ -121,14 +120,14 @@ public final class SparkMemory implements Memory.Admin, Serializable {
if (this.inTask)
this.memory.get(key).add(value);
else
- this.memory.get(key).setValue(value);
+ throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
}
@Override
public void set(final String key, final Object value) {
checkKeyValue(key, value);
if (this.inTask)
- this.memory.get(key).add(value);
+ throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
else
this.memory.get(key).setValue(value);
}
@@ -138,6 +137,10 @@ public final class SparkMemory implements Memory.Admin, Serializable {
return StringFactory.memoryString(this);
}
+ protected void complete() {
+ this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(memoryComputeKey -> this.memory.remove(memoryComputeKey.getKey()));
+ }
+
protected void setInTask(final boolean inTask) {
this.inTask = inTask;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 49227af..fe5dd8d 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -141,8 +141,8 @@ public final class TinkerGraphComputer implements GraphComputer {
TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
// execute the vertex program
this.vertexProgram.setup(this.memory);
- this.memory.completeSubRound();
while (true) {
+ this.memory.completeSubRound();
workers.setVertexProgram(this.vertexProgram);
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
workers.executeVertexProgram(vertexProgram -> {
@@ -162,11 +162,9 @@ public final class TinkerGraphComputer implements GraphComputer {
this.memory.completeSubRound();
if (this.vertexProgram.terminate(this.memory)) {
this.memory.incrIteration();
- this.memory.completeSubRound();
break;
} else {
this.memory.incrIteration();
- this.memory.completeSubRound();
}
}
} else {
@@ -212,12 +210,12 @@ public final class TinkerGraphComputer implements GraphComputer {
}
// update runtime and return the newly computed graph
this.memory.setRuntime(System.currentTimeMillis() - time);
- this.memory.complete();
+ this.memory.complete(); // drop all transient properties and set iteration
// determine the resultant graph based on the result graph/persist state
- final TinkerGraphComputerView view = TinkerHelper.getGraphComputerView(this.graph);
- view.complete(); // drop all transient properties
+ final TinkerGraphComputerView view = TinkerHelper.getGraphComputerView(this.graph); // can return a null view if no vertexprogram is used
+ if (null != view) view.complete(); // drop all transient properties
final Graph resultGraph = null == view ? this.graph : view.processResultGraphPersist(this.resultGraph, this.persist);
- TinkerHelper.dropGraphComputerView(this.graph);
+ TinkerHelper.dropGraphComputerView(this.graph); // drop the view from the original source graph
return new DefaultComputerResult(resultGraph, this.memory.asImmutable());
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
index 4c36e63..c9c8e18 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
@@ -128,8 +128,13 @@ public final class TinkerGraphComputerView {
}
protected void complete() {
- // this.computeKeys.values().stream().filter(ComputeKey.Vertex::isTransient).forEach(key ->
- // this.computeProperties.values().stream().flatMap(map -> map.get(key.getKey()).stream()).forEach(VertexProperty::remove));
+ // remove all transient properties from the vertices
+ for (final VertexComputeKey computeKey : this.computeKeys.values()) {
+ if (computeKey.isTransient()) {
+ final List<VertexProperty<?>> toRemove = this.computeProperties.values().stream().flatMap(map -> map.get(computeKey.getKey()).stream()).collect(Collectors.toList());
+ toRemove.forEach(VertexProperty::remove);
+ }
+ }
}
//////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/706759c1/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
index 54e064b..a1a3270 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
@@ -43,18 +43,18 @@ public final class TinkerMemory implements Memory.Admin {
public Map<String, Object> currentMap;
private final AtomicInteger iteration = new AtomicInteger(0);
private final AtomicLong runtime = new AtomicLong(0l);
+ private boolean inExecute = false;
public TinkerMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers) {
this.currentMap = new ConcurrentHashMap<>();
this.previousMap = new ConcurrentHashMap<>();
if (null != vertexProgram) {
- for (final MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) {
- MemoryHelper.validateKey(key.getKey());
- this.memoryKeys.put(key.getKey(), key);
+ for (final MemoryComputeKey memoryComputeKey : vertexProgram.getMemoryComputeKeys()) {
+ this.memoryKeys.put(memoryComputeKey.getKey(), memoryComputeKey);
}
}
for (final MapReduce mapReduce : mapReducers) {
- this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), (a, b) -> b, false));
+ this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
}
}
@@ -91,11 +91,12 @@ public final class TinkerMemory implements Memory.Admin {
protected void complete() {
this.iteration.decrementAndGet();
this.previousMap = this.currentMap;
- this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(key -> this.previousMap.remove(key.getKey()));
+ this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(computeKey -> this.previousMap.remove(computeKey.getKey()));
}
protected void completeSubRound() {
this.previousMap = new ConcurrentHashMap<>(this.currentMap);
+ this.inExecute = !this.inExecute;
}
@Override
@@ -115,12 +116,16 @@ public final class TinkerMemory implements Memory.Admin {
@Override
public void set(final String key, final Object value) {
checkKeyValue(key, value);
+ if (this.inExecute)
+ throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
this.currentMap.put(key, value);
}
@Override
public void add(final String key, final Object value) {
checkKeyValue(key, value);
+ if (!this.inExecute)
+ throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
this.currentMap.compute(key, (k, v) -> null == v ? value : this.memoryKeys.get(key).getReducer().apply(v, value));
}