You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/23 20:25:16 UTC
incubator-tinkerpop git commit: New Reducing-based Memory-model
implemented. A few kooky things emerged because of this and will discuss in
UPGRADE docs. However,
all-in-all this is a much nicer model which will lead to significant
perofmrance improvemen
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1166 90debb4fe -> 9416e9498
New Reducing-based Memory-model implemented. A few kooky things emerged because of this and will discuss in UPGRADE docs. However, all-in-all this is a much nicer model which will lead to significant perofmrance improvements (still need to benchmark and test). I don't think we will ever deprecate the MapReduce infrastructure. The Memory model is not as flexible and efficient (for certain jobs) as MapReduce.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9416e949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9416e949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9416e949
Branch: refs/heads/TINKERPOP-1166
Commit: 9416e9498a363e8ca1ef69df6cc0d046762e43e3
Parents: 90debb4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Feb 23 10:25:36 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Feb 23 10:25:36 2016 -0700
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 13 +-
.../traversal/TraversalVertexProgram.java | 3 +
.../traversal/step/map/CountGlobalStep.java | 2 +-
.../process/traversal/step/map/FoldStep.java | 4 +-
.../traversal/step/map/GroupCountStep.java | 7 +-
.../process/traversal/step/map/GroupStep.java | 11 +-
.../traversal/step/map/GroupStepV3d0.java | 182 ++++---------------
.../traversal/step/map/MaxGlobalStep.java | 2 +-
.../traversal/step/map/MeanGlobalStep.java | 12 +-
.../traversal/step/map/MinGlobalStep.java | 2 +-
.../traversal/step/map/SumGlobalStep.java | 2 +-
.../process/traversal/step/map/TreeStep.java | 2 +-
.../gremlin/structure/io/gryo/GryoMapper.java | 26 ++-
.../hadoop/structure/io/ObjectWritable.java | 4 +-
.../process/computer/MemoryAccumulator.java | 23 +--
.../spark/process/computer/SparkMemory.java | 28 ++-
.../io/PersistedInputOutputRDDTest.java | 8 +-
17 files changed, 122 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index f3c1624..5055903 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -196,15 +196,16 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
}
// execute the job and wait until it completes (if it fails, throw an exception)
if (!job.run(true))
- throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs"); // how do I get the exception that occured?
+ throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs: " + job.getInternalJob().getStatus().getFailureInfo());
// add vertex program memory values to the return memory
- for (final MemoryComputeKey memoryKey : this.vertexProgram.getMemoryComputeKeys()) {
- if (!memoryKey.isTransient() && storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()))) {
- final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey())));
+ for (final MemoryComputeKey memoryComputeKey : this.vertexProgram.getMemoryComputeKeys()) {
+ if (!memoryComputeKey.isTransient() && storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryComputeKey.getKey()))) {
+ final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryComputeKey.getKey())));
if (iterator.hasNext()) {
- this.memory.set(memoryKey.getKey(), iterator.next().getValue());
+ this.memory.set(memoryComputeKey.getKey(), iterator.next().getValue());
}
- storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()));
+ // vertex program memory items are not stored on disk
+ storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryComputeKey.getKey()));
}
}
final Path path = new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 92ea9eb..1d29f86 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStepV3d0;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
@@ -189,6 +190,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
if (memory.exists(ReducingBarrierStep.REDUCING)) {
if (reducingBarrierStep instanceof GroupStep)
memory.set(ReducingBarrierStep.REDUCING, ((GroupStep) reducingBarrierStep).getReducedMap(memory.get(ReducingBarrierStep.REDUCING)));
+ else if(reducingBarrierStep instanceof GroupStepV3d0)
+ memory.set(ReducingBarrierStep.REDUCING, ((GroupStepV3d0) reducingBarrierStep).getReducedMap(memory.get(ReducingBarrierStep.REDUCING)));
else
memory.set(ReducingBarrierStep.REDUCING, FinalGet.tryFinalGet(memory.get(ReducingBarrierStep.REDUCING)));
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
index 4d72146..2c7cf0c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
@@ -55,7 +55,7 @@ public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> {
///////////
- private static class CountBiOperator implements BinaryOperator<Long>, Serializable {
+ public static class CountBiOperator implements BinaryOperator<Long>, Serializable {
@Override
public Long apply(final Long mutatingSeed, final Long count) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
index d9dec06..130f53a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
@@ -72,7 +72,7 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
/////////
- private static class ListBiOperator<S> implements BinaryOperator<List<S>>, Serializable {
+ public static class ListBiOperator<S> implements BinaryOperator<List<S>>, Serializable {
@Override
public List<S> apply(final List<S> mutatingSeed, final List<S> list) {
mutatingSeed.addAll(list);
@@ -80,7 +80,7 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
}
}
- private static class FoldBiOperator<E> implements BinaryOperator<E>, Serializable {
+ public static class FoldBiOperator<E> implements BinaryOperator<E>, Serializable {
private final BiFunction biFunction;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
index 966d17a..8180950 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
@@ -31,7 +31,6 @@ import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
import java.io.Serializable;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -62,9 +61,7 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
}
public Map<E, Long> projectTraverser(final Traverser.Admin<S> traverser) {
- final Map<E, Long> map = new HashMap<>(); // TODO: make singleton map
- map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), traverser.bulk());
- return map;
+ return Collections.singletonMap(TraversalUtil.applyNullable(traverser, this.keyTraversal), traverser.bulk());
}
@Override
@@ -96,7 +93,7 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
///////////
- private static class GroupCountBiOperator<E> implements BinaryOperator<Map<E, Long>>, Serializable {
+ public static final class GroupCountBiOperator<E> implements BinaryOperator<Map<E, Long>>, Serializable {
@Override
public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Map<E, Long> map) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index 00d9a8d..ba4a840 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -79,9 +80,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
} else
traverserSet.add(traverser);
- final Map<K, V> map = new HashMap<>();
- map.put(key, (V) traverserSet);
- return map;
+ return Collections.singletonMap(key, (V) traverserSet);
}
@Override
@@ -152,7 +151,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
///////////
- private static class GroupComputerBiOperator<S, K, V> implements BinaryOperator<Map<K, V>>, Serializable {
+ public static final class GroupComputerBiOperator<S, K, V> implements BinaryOperator<Map<K, V>>, Serializable {
private GroupComputerBiOperator() {
}
@@ -171,11 +170,11 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
}
}
- private static class GroupStandardBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
+ public static final class GroupStandardBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
private final GroupStep groupStep;
- private GroupStandardBiOperator(final GroupStep groupStep) {
+ public GroupStandardBiOperator(final GroupStep groupStep) {
this.groupStep = groupStep;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
index a2b7f5e..b72dce5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
@@ -19,67 +19,62 @@
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import java.util.function.Supplier;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@Deprecated
-public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<K, R>> implements MapReducer, ByModulating, GraphComputing, TraversalParent {
+public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<K, R>> implements ByModulating, GraphComputing, TraversalParent {
private char state = 'k';
private Traversal.Admin<S, K> keyTraversal = null;
private Traversal.Admin<S, V> valueTraversal = null;
private Traversal.Admin<Collection<V>, R> reduceTraversal = null;
- private boolean byPass = false;
public GroupStepV3d0(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier((Supplier) new GroupMapSupplierV3d0());
- // this.setBiFunction(new GroupBiFunction(this));
+ this.setReducingBiOperator(new GroupBiOperatorV3d0<>());
}
@Override
- public Map<K, R> projectTraverser(Traverser.Admin<S> traverser) {
- return null;
+ public Map<K, R> projectTraverser(final Traverser.Admin<S> traverser) {
+ final K key = TraversalUtil.applyNullable(traverser, this.keyTraversal);
+ final BulkSet<V> values = new BulkSet<>();
+ final V value = TraversalUtil.applyNullable(traverser, this.valueTraversal);
+ TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+ return Collections.singletonMap(key, (R) values);
}
@Override
public void onGraphComputer() {
- this.byPass = true;
+ this.setSeedSupplier((Supplier) HashMapSupplier.instance());
+ this.setReducingBiOperator(new GroupBiOperatorV3d0<>());
}
@Override
@@ -94,10 +89,6 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
return children;
}
- public Traversal.Admin<Collection<V>, R> getReduceTraversal() {
- return this.reduceTraversal;
- }
-
@Override
public void modulateBy(final Traversal.Admin<?, ?> kvrTraversal) {
if ('k' == this.state) {
@@ -128,7 +119,6 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
if (null != this.reduceTraversal)
clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
- // clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
return clone;
}
@@ -142,63 +132,49 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
}
@Override
- public MapReduce<K, Collection<V>, K, R, Map<K, R>> getMapReduce() {
- return new GroupMapReduceV3d0<>(this);
+ public String toString() {
+ return StringFactory.stepString(this, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
}
- @Override
- public Traverser<Map<K, R>> processNextStart() {
- if (this.byPass) {
- final Traverser.Admin<S> traverser = this.starts.next();
- final Object[] kvPair = new Object[]{TraversalUtil.applyNullable(traverser, (Traversal.Admin<S, Map>) this.keyTraversal), TraversalUtil.applyNullable(traverser, (Traversal.Admin<S, Map>) this.valueTraversal)};
- return traverser.asAdmin().split(kvPair, (Step) this);
- } else {
- return super.processNextStart();
+ public Map<K, R> getReducedMap(final Map<K, Collection<V>> valueMap) {
+ final Map<K, R> reducedMap = new HashMap<>();
+ for (final K key : valueMap.keySet()) {
+ final R r = TraversalUtil.applyNullable(valueMap.get(key), this.reduceTraversal);
+ reducedMap.put(key, r);
}
+ return reducedMap;
}
- @Override
- public String toString() {
- return StringFactory.stepString(this, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
- }
+ ////////////
- ///////////
+ public static class GroupBiOperatorV3d0<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
- private static class GroupBiFunction<S, K, V> implements BiFunction<Map<K, Collection<V>>, Traverser.Admin<S>, Map<K, Collection<V>>>, Serializable {
+ private GroupBiOperatorV3d0() {
- private final GroupStepV3d0<S, K, V, ?> groupStep;
-
- private GroupBiFunction(final GroupStepV3d0<S, K, V, ?> groupStep) {
- this.groupStep = groupStep;
}
@Override
- public Map<K, Collection<V>> apply(final Map<K, Collection<V>> mutatingSeed, final Traverser.Admin<S> traverser) {
- final K key = TraversalUtil.applyNullable(traverser, this.groupStep.keyTraversal);
- final V value = TraversalUtil.applyNullable(traverser, this.groupStep.valueTraversal);
- Collection<V> values = mutatingSeed.get(key);
- if (null == values) {
- values = new BulkSet<>();
- mutatingSeed.put(key, values);
+ public Map<K, V> apply(final Map<K, V> mutatingSeed, final Map<K, V> map) {
+ for (final K key : map.keySet()) {
+ final BulkSet<V> values = (BulkSet<V>) map.get(key);
+ BulkSet<V> seedValues = (BulkSet<V>) mutatingSeed.get(key);
+ if (null == seedValues) {
+ seedValues = new BulkSet<>();
+ mutatingSeed.put(key, (V) seedValues);
+ }
+ seedValues.addAll(values);
}
- TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
return mutatingSeed;
}
}
- //////////
+ ////////
private class GroupMapV3d0 extends HashMap<K, Collection<V>> implements FinalGet<Map<K, R>> {
@Override
public Map<K, R> getFinal() {
- if (null == GroupStepV3d0.this.reduceTraversal)
- return (Map<K, R>) this;
- else {
- final Map<K, R> reduceMap = new HashMap<>();
- this.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.applyNullable(vv, GroupStepV3d0.this.reduceTraversal)));
- return reduceMap;
- }
+ return GroupStepV3d0.this.getReducedMap(this);
}
}
@@ -212,92 +188,4 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
return new GroupMapV3d0();
}
}
-
- ///////////
-
- public static final class GroupMapReduceV3d0<K, V, R> implements MapReduce<K, Collection<V>, K, R, Map<K, R>> {
-
- public static final String GROUP_BY_STEP_STEP_ID = "gremlin.groupStep.stepId";
-
- private String groupStepId;
- private Traversal.Admin<Collection<V>, R> reduceTraversal;
-
- private GroupMapReduceV3d0() {
-
- }
-
- public GroupMapReduceV3d0(final GroupStepV3d0<?, K, V, R> step) {
- this.groupStepId = step.getId();
- this.reduceTraversal = step.getReduceTraversal();
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- MapReduce.super.storeState(configuration);
- configuration.setProperty(GROUP_BY_STEP_STEP_ID, this.groupStepId);
- }
-
- @Override
- public void loadState(final Graph graph, final Configuration configuration) {
- this.groupStepId = configuration.getString(GROUP_BY_STEP_STEP_ID);
- this.reduceTraversal = ((GroupStepV3d0) new TraversalMatrix<>(TraversalVertexProgram.getTraversal(graph, configuration)).getStepById(this.groupStepId)).getReduceTraversal();
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return !stage.equals(Stage.COMBINE);
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<K, Collection<V>> emitter) {
- vertex.<TraverserSet<Object[]>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> {
- final Object[] objects = traverser.get();
- for (int i = 0; i < traverser.bulk(); i++) {
- if (objects[1] instanceof Collection)
- emitter.emit((K) objects[0], (Collection<V>) objects[1]);
- else {
- final List<V> collection = new ArrayList<>();
- collection.add((V) objects[1]);
- emitter.emit((K) objects[0], collection);
- }
- }
- }));
- }
-
- @Override
- public void reduce(final K key, final Iterator<Collection<V>> values, final ReduceEmitter<K, R> emitter) {
- final Set<V> set = new BulkSet<>();
- values.forEachRemaining(set::addAll);
- emitter.emit(key, TraversalUtil.applyNullable(set, this.reduceTraversal));
- }
-
- @Override
- public Map<K, R> generateFinalResult(final Iterator<KeyValue<K, R>> keyValues) {
- final Map<K, R> map = new HashMap<>();
- keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
- return map;
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- @Override
- public GroupMapReduceV3d0<K, V, R> clone() {
- try {
- final GroupMapReduceV3d0<K, V, R> clone = (GroupMapReduceV3d0<K, V, R>) super.clone();
- if (null != clone.reduceTraversal)
- clone.reduceTraversal = this.reduceTraversal.clone();
- return clone;
- } catch (final CloneNotSupportedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- @Override
- public String toString() {
- return StringFactory.mapReduceString(this, this.getMemoryKey());
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
index 7db28e5..c9466db 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
@@ -57,7 +57,7 @@ public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S
/////
- private static class MaxGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+ public static class MaxGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
public S apply(final S mutatingSeed, final S number) {
return !NAN.equals(mutatingSeed) ? (S) max(mutatingSeed, number) : number;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
index 53688cd..1f8c99e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
@@ -61,11 +61,19 @@ public final class MeanGlobalStep<S extends Number, E extends Number> extends Re
/////
- private static class MeanGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+ public static final class MeanGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
public S apply(final S mutatingSeed, final S number) {
- return (S) (number instanceof MeanNumber ? ((MeanNumber) mutatingSeed).add((MeanNumber) number) : ((MeanNumber) mutatingSeed).add(number, 1l));
+ if (mutatingSeed instanceof MeanNumber) {
+ return (number instanceof MeanNumber) ?
+ (S) ((MeanNumber) mutatingSeed).add((MeanNumber) number) :
+ (S) ((MeanNumber) mutatingSeed).add(number, 1l);
+ } else {
+ return (number instanceof MeanNumber) ?
+ (S) ((MeanNumber) number).add(mutatingSeed, 1l) :
+ (S) new MeanNumber(number, 1l).add(mutatingSeed, 1l);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
index a1bc2c4..bdcad4c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
@@ -56,7 +56,7 @@ public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S
/////
- private static class MinGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+ public static class MinGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
public S apply(final S mutatingSeed, final S number) {
return !NAN.equals(mutatingSeed) ? (S) min(mutatingSeed, number) : number;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
index bea7717..2311344 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
@@ -61,7 +61,7 @@ public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S
/////
- private static class SumGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+ public static class SumGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
public S apply(final S mutatingSeed, final S number) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
index fa41210..3aeecbf 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
@@ -109,7 +109,7 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements T
///////////
- private static class TreeBiOperator implements BinaryOperator<Tree>, Serializable {
+ public static final class TreeBiOperator implements BinaryOperator<Tree>, Serializable {
@Override
public Tree apply(final Tree mutatingSeed, final Tree tree) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 94fd7d6..c8fceb7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -23,6 +23,16 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.traversal.Contains;
import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStepV3d0;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SumGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TreeStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_P_S_SE_SL_Traverser;
@@ -317,7 +327,21 @@ public final class GryoMapper implements Mapper<Kryo> {
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.OrOperator.class, null, 108));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SetOperator.class, null, 109));
add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumLongOperator.class, null, 110));
- add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumIntegerOperator.class, null, 111)); // ***LAST ID**
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumIntegerOperator.class, null, 111));
+
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(CountGlobalStep.CountBiOperator.class, null, 112));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(FoldStep.ListBiOperator.class, null, 113));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(FoldStep.FoldBiOperator.class, null, 114));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupCountStep.GroupCountBiOperator.class, null, 115));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStep.GroupComputerBiOperator.class, null, 116));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStep.GroupStandardBiOperator.class, null, 117));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MaxGlobalStep.MaxGlobalBiOperator.class, null, 118));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MeanGlobalStep.MeanGlobalBiOperator.class, null, 119));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MeanGlobalStep.MeanNumber.class, null, 120));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MinGlobalStep.MinGlobalBiOperator.class, null, 121));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(SumGlobalStep.SumGlobalBiOperator.class, null, 122));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TreeStep.TreeBiOperator.class, null, 123));
+ add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStepV3d0.GroupBiOperatorV3d0.class, null, 124)); // ***LAST ID**
}};
private final List<IoRegistry> registries = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
index 5c45400..a6b11ca 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
@@ -68,7 +68,7 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
// the type is embedded in the stream so it can just read it from there and return it as needed.
// presumably that will cast nicely to T
return (T) gryoReader.readObject(new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)), Object.class);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
});
@@ -81,7 +81,7 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
gryoWriter.writeObject(outputStream, this.t);
WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
index 8e88b9f..cf8cb25 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
@@ -20,12 +20,13 @@
package org.apache.tinkerpop.gremlin.spark.process.computer;
import org.apache.spark.AccumulatorParam;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class MemoryAccumulator<A> implements AccumulatorParam<A> {
+public final class MemoryAccumulator<A> implements AccumulatorParam<ObjectWritable<A>> {
private final MemoryComputeKey<A> memoryComputeKey;
@@ -34,25 +35,21 @@ public final class MemoryAccumulator<A> implements AccumulatorParam<A> {
}
@Override
- public A addAccumulator(final A a, final A b) {
- if (null == a)
+ public ObjectWritable<A> addAccumulator(final ObjectWritable<A> a, final ObjectWritable<A> b) {
+ if (a.isEmpty())
return b;
- if (null == b)
+ if (b.isEmpty())
return a;
- return this.memoryComputeKey.getReducer().apply(a, b);
+ return new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(a.get(), b.get()));
}
@Override
- public A addInPlace(final A a, final A b) {
- if (null == a)
- return b;
- if (null == b)
- return a;
- return this.memoryComputeKey.getReducer().apply(a, b);
+ public ObjectWritable<A> addInPlace(final ObjectWritable<A> a, final ObjectWritable<A> b) {
+ return this.addAccumulator(a, b);
}
@Override
- public A zero(final A a) {
- return null;
+ public ObjectWritable<A> zero(final ObjectWritable<A> a) {
+ return ObjectWritable.empty();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
index 8262513..cb672e1 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
@@ -46,22 +47,23 @@ public final class SparkMemory implements Memory.Admin, Serializable {
public final Map<String, MemoryComputeKey> memoryKeys = new HashMap<>();
private final AtomicInteger iteration = new AtomicInteger(0); // do these need to be atomics?
private final AtomicLong runtime = new AtomicLong(0l);
- private final Map<String, Accumulator> memory = new HashMap<>();
+ private final Map<String, Accumulator<ObjectWritable>> memory = new HashMap<>();
private Broadcast<Map<String, Object>> broadcast;
private boolean inTask = false;
public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
if (null != vertexProgram) {
for (final MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) {
- MemoryHelper.validateKey(key.getKey());
this.memoryKeys.put(key.getKey(), key);
}
}
for (final MapReduce mapReduce : mapReducers) {
this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
}
- for (final MemoryComputeKey key : this.memoryKeys.values()) {
- this.memory.put(key.getKey(), sparkContext.accumulator(null, key.getKey(), new MemoryAccumulator<>(key)));
+ for (final MemoryComputeKey memoryComputeKey : this.memoryKeys.values()) {
+ this.memory.put(
+ memoryComputeKey.getKey(),
+ sparkContext.accumulator(ObjectWritable.empty(), memoryComputeKey.getKey(), new MemoryAccumulator<>(memoryComputeKey)));
}
this.broadcast = sparkContext.broadcast(new HashMap<>());
}
@@ -73,7 +75,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
else {
final Set<String> trueKeys = new HashSet<>();
this.memory.forEach((key, value) -> {
- if (value.value() != null)
+ if (!value.value().isEmpty())
trueKeys.add(key);
});
return Collections.unmodifiableSet(trueKeys);
@@ -107,18 +109,18 @@ public final class SparkMemory implements Memory.Admin, Serializable {
@Override
public <R> R get(final String key) throws IllegalArgumentException {
- final R r = this.getValue(key);
- if (null == r)
+ final ObjectWritable<R> r = (ObjectWritable<R>) (this.inTask ? this.broadcast.value().get(key) : this.memory.get(key).value());
+ if (r.isEmpty())
throw Memory.Exceptions.memoryDoesNotExist(key);
else
- return r;
+ return r.get();
}
@Override
public void add(final String key, final Object value) {
checkKeyValue(key, value);
if (this.inTask)
- this.memory.get(key).add(value);
+ this.memory.get(key).add(new ObjectWritable<>(value));
else
throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
}
@@ -129,7 +131,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
if (this.inTask)
throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
else
- this.memory.get(key).setValue(value);
+ this.memory.get(key).setValue(new ObjectWritable<>(value));
}
@Override
@@ -149,7 +151,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
this.broadcast.destroy(true); // do we need to block?
final Map<String, Object> toBroadcast = new HashMap<>();
this.memory.forEach((key, object) -> {
- if (null != object.value())
+ if (!object.value().isEmpty())
toBroadcast.put(key, object.value());
});
this.broadcast = sparkContext.broadcast(toBroadcast);
@@ -160,8 +162,4 @@ public final class SparkMemory implements Memory.Admin, Serializable {
throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
MemoryHelper.validateValue(value);
}
-
- private <R> R getValue(final String key) {
- return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 040dd39..dd2dec7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -139,7 +139,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
@Test
public void shouldPersistRDDAcrossJobs() throws Exception {
-
+ Spark.create("local[4]");
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
final Configuration configuration = super.getBaseConfiguration();
@@ -157,8 +157,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
"gremlin-groovy",
"g.V().count()").create(graph)).submit().get();
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
- assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
- assertEquals(2, Spark.getContext().getPersistentRDDs().size());
+ assertEquals(1, Spark.getContext().getPersistentRDDs().size());
///////
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
@@ -167,8 +166,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
graph = GraphFactory.open(configuration);
assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue());
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
- assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
- assertEquals(2, Spark.getContext().getPersistentRDDs().size());
+ assertEquals(1, Spark.getContext().getPersistentRDDs().size());
Spark.close();
}