You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/05 20:15:12 UTC
incubator-tinkerpop git commit: got fold() and group() working with
SparkStarBarrierInterceptor. Ensured that SparkStarBarrierInterceptor doesn't
cache or partition the inputRDD. Added some logic to ensure that such
configurations are propagated through
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1288 1e6aa16b9 -> 3a8995b0e
got fold() and group() working with SparkStarBarrierInterceptor. Ensured that SparkStarBarrierInterceptor doesn't cache or partition the inputRDD. Added some logic to ensure that such configurations are propagated through chained OLAP jobs. Also, reverted some work I did on DefaultTraversal. I will raise another issue for the (minor) problem. Integration tests pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3a8995b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3a8995b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3a8995b0
Branch: refs/heads/TINKERPOP-1288
Commit: 3a8995b0ef54fd2bfa10dddd879d1a2de2a23636
Parents: 1e6aa16
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 5 14:15:01 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 5 14:15:01 2016 -0600
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 3 ++
.../traversal/util/DefaultTraversal.java | 7 ++-
.../process/computer/SparkGraphComputer.java | 5 ++
.../optimization/SparkInterceptorStrategy.java | 8 ++-
.../SparkStarBarrierInterceptor.java | 56 ++++++++++++++------
.../SparkInterceptorStrategyTest.java | 32 ++++++++---
6 files changed, 82 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a8995b0/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 3ed8618..6bd2a04 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -136,6 +136,9 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
throw new IllegalStateException(e.getMessage(), e);
}
this.memory.setRuntime(System.currentTimeMillis() - startTime);
+ // clear properties that should not be propagated in an OLAP chain
+ apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), this.memory.asImmutable());
}, exec);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a8995b0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
index 78db94a..5ea5668 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
@@ -297,16 +297,15 @@ public class DefaultTraversal<S, E> implements Traversal.Admin<S, E> {
@Override
public boolean equals(final Object other) {
- return other != null && other instanceof Traversal.Admin && this.equals(((Traversal.Admin) other));
+ return other != null && other.getClass().equals(this.getClass()) && this.equals(((Traversal.Admin) other));
}
@Override
public int hashCode() {
- int result = 1;
+ int result = this.getClass().hashCode();
for (final Step step : this.asAdmin().getSteps()) {
result ^= step.hashCode();
}
return result;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a8995b0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 4b91a10..5bc040f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -354,6 +354,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+ // clear properties that should not be propagated in an OLAP chain
+ apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
+ apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
+ apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
} finally {
if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a8995b0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
index 14a23e2..19d21b3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
@@ -46,8 +46,12 @@ public final class SparkInterceptorStrategy extends AbstractTraversalStrategy<Tr
final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph).getTraversal().get().clone();
if (!computerTraversal.isLocked())
computerTraversal.applyStrategies();
- if (SparkStarBarrierInterceptor.isLegal(computerTraversal))
- step.setComputer(step.getComputer().configure(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkStarBarrierInterceptor.class.getCanonicalName()));
+ if (SparkStarBarrierInterceptor.isLegal(computerTraversal)) {
+ step.setComputer(step.getComputer()
+ .configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, true)
+ .configure(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, true)
+ .configure(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkStarBarrierInterceptor.class.getCanonicalName()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a8995b0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 76272be..070f3b2 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -31,8 +31,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
@@ -41,16 +43,17 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+import org.apache.tinkerpop.gremlin.util.function.ArrayListSupplier;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BinaryOperator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -67,10 +70,8 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
final Traversal.Admin<Vertex, Object> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
final Object[] graphStepIds = ((GraphStep) traversal.getStartStep()).getIds(); // any V(1,2,3)-style ids to filter on
final ReducingBarrierStep endStep = (ReducingBarrierStep) traversal.getEndStep(); // needed for the final traverser generation
- final Traversal.Admin<Object, Object> endStepTraversal = new DefaultTraversal<>();
- endStepTraversal.addStep(endStep).applyStrategies();
traversal.removeStep(0); // remove GraphStep
- traversal.removeStep(traversal.getSteps().size() - 1); // remove CountGlobalStep
+ traversal.removeStep(traversal.getSteps().size() - 1); // remove ReducingBarrierStep
traversal.applyStrategies(); // compile
boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
///////////////////////////////
@@ -89,6 +90,7 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
};
}
});
+ // USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL
final Object result;
if (endStep instanceof CountGlobalStep)
result = nextRDD.map(Traverser::bulk).fold(0l, (a, b) -> a + b);
@@ -109,23 +111,45 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
result = nextRDD
.map(traverser -> (Number) traverser.get())
.fold(Integer.MIN_VALUE, NumberHelper::max);
- else if (endStep instanceof GroupCountStep)
+ else if (endStep instanceof FoldStep) {
+ final BinaryOperator biOperator = endStep.getBiOperator();
+ result = nextRDD.map(traverser -> {
+ if (endStep.getSeedSupplier() instanceof ArrayListSupplier) {
+ final List list = new ArrayList<>();
+ for (long i = 0; i < traverser.bulk(); i++) {
+ list.add(traverser.get());
+ }
+ return list;
+ } else {
+ return traverser.get();
+ }
+ }).fold(endStep.getSeedSupplier().get(), biOperator::apply);
+ } else if (endStep instanceof GroupStep) {
+ ((GroupStep) endStep).onGraphComputer();
+ final GroupStep.GroupBiOperator<Object, Object> biOperator = (GroupStep.GroupBiOperator) endStep.getBiOperator();
+ result = ((GroupStep) endStep).generateFinalResult(nextRDD.
+ mapPartitions(partitions -> {
+ final GroupStep<Object, Object, Object> clone = (GroupStep) endStep.clone();
+ return () -> IteratorUtils.map(partitions, clone::projectTraverser);
+ }).fold(((GroupStep<Object, Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply));
+ } else if (endStep instanceof GroupCountStep) {
+ final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance();
result = nextRDD
- .mapPartitions(partition -> {
- final Traversal.Admin<?, Map<?, Long>> clone = (Traversal.Admin) endStepTraversal.clone();
- return () -> IteratorUtils.map(partition, traverser -> TraversalUtil.apply((Traverser.Admin) traverser, clone));
+ .mapPartitions(partitions -> {
+ final GroupCountStep<Object, Object> clone = (GroupCountStep) endStep.clone();
+ return () -> IteratorUtils.map(partitions, clone::projectTraverser);
})
- .fold(((GroupCountStep<?, ?>) endStep).getSeedSupplier().get(), (a, b) -> GroupCountStep.GroupCountBiOperator.instance().apply((Map) a, (Map) b));
- else
+ .fold(((GroupCountStep<Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply);
+ } else
throw new IllegalArgumentException("The end step is an unsupported barrier: " + endStep);
memory.setInExecute(false);
///////////////////////////////
// generate the HALTED_TRAVERSERS for the memory
final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
- haltedTraversers.add(traversal.getTraverserGenerator().generate(result, endStep, 1l));
+ haltedTraversers.add(traversal.getTraverserGenerator().generate(result, endStep, 1l)); // all reducing barrier steps produce a result of bulk 1
memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
- memory.incrIteration(); // any local star graph reduction take a single iteration
+ memory.incrIteration(); // any local star graph reduction takes a single iteration
return inputRDD;
}
@@ -139,8 +163,10 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
!endStep.getClass().equals(MeanGlobalStep.class) &&
!endStep.getClass().equals(MaxGlobalStep.class) &&
!endStep.getClass().equals(MinGlobalStep.class) &&
+ !endStep.getClass().equals(FoldStep.class) &&
+ !endStep.getClass().equals(GroupStep.class) &&
!endStep.getClass().equals(GroupCountStep.class))
- // TODO: group(), fold(), and tree()
+ // TODO: tree()
return false;
if (TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, Barrier.class, traversal).size() != 1)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a8995b0/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index a43f081..24e3663 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.VertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -47,6 +48,7 @@ import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -66,7 +68,9 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
///
Graph graph = GraphFactory.open(configuration);
- GraphTraversalSource g = graph.traversal().withComputer();
+ GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkSingleIterationStrategy.class);
+ assertFalse(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
+ assertFalse(g.V().count().explain().toString().contains(SparkSingleIterationStrategy.class.getSimpleName()));
assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
/// groupCount(m)-test
@@ -92,7 +96,9 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
///
Graph graph = GraphFactory.open(configuration);
- GraphTraversalSource g = graph.traversal().withComputer();
+ GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkSingleIterationStrategy.class);
+ assertFalse(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
+ assertFalse(g.V().count().explain().toString().contains(SparkSingleIterationStrategy.class.getSimpleName()));
assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
/// SparkCountInterceptor matches
@@ -124,6 +130,11 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
put("person", 4l);
}}, g.V().<String>groupCount().by(T.label));
test(SparkStarBarrierInterceptor.class, Collections.singletonMap("person", 2l), g.V().has("person", "age", P.lt(30)).<String>groupCount().by(T.label));
+ test(SparkStarBarrierInterceptor.class, new HashMap<String, Long>() {{
+ put("software", 2l);
+ put("person", 4l);
+ }}, g.V().<String, Long>group().by(T.label).by(__.count()));
+ test(SparkStarBarrierInterceptor.class, 123l, g.V().hasLabel("person").values("age").fold(0l, Operator.sum));
/// No interceptor matches
test(2l, g.V().out().out().count());
test(6l, g.E().count());
@@ -142,14 +153,19 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {
final Traversal.Admin<?, ?> clone = traversal.asAdmin().clone();
clone.applyStrategies();
- final String interceptor = (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, clone).get()
- .getComputer()
- .getConfiguration()
- .getOrDefault(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, null);
- if (null == expectedInterceptor)
+ final Map<String, Object> configuration = TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, clone).get().getComputer().getConfiguration();
+ final String interceptor = (String) configuration.getOrDefault(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, null);
+ if (null == expectedInterceptor) {
assertNull(interceptor);
- else
+ assertFalse((Boolean) configuration.getOrDefault(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false));
+ assertFalse((Boolean) configuration.getOrDefault(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false));
+ } else {
assertEquals(expectedInterceptor, Class.forName(interceptor));
+ if (interceptor.equals(SparkStarBarrierInterceptor.class.getCanonicalName())) {
+ assertTrue((Boolean) configuration.getOrDefault(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false));
+ assertTrue((Boolean) configuration.getOrDefault(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false));
+ }
+ }
assertEquals(expectedResult, traversal.next());
}