You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/26 14:18:51 UTC
[19/29] tinkerpop git commit: TINKERPOP-1389 Support Spark 2.0
TINKERPOP-1389 Support Spark 2.0
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/db3dfcb4
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/db3dfcb4
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/db3dfcb4
Branch: refs/heads/TINKERPOP-1389
Commit: db3dfcb4aff776a8c10db393ad5d1811607eba30
Parents: 1484c8d
Author: yucx <yu...@cn.ibm.com>
Authored: Thu Sep 1 23:11:58 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Oct 26 08:13:53 2016 -0600
----------------------------------------------------------------------
giraph-gremlin/pom.xml | 2 +-
hadoop-gremlin/pom.xml | 2 +-
spark-gremlin/pom.xml | 13 +++++-
.../spark/process/computer/SparkExecutor.java | 16 ++++----
.../SparkStarBarrierInterceptor.java | 10 ++---
.../spark/structure/io/gryo/GryoSerializer.java | 43 ++++++++++++++++----
6 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/giraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-gremlin/pom.xml b/giraph-gremlin/pom.xml
index 102f3a9..25d96ec 100644
--- a/giraph-gremlin/pom.xml
+++ b/giraph-gremlin/pom.xml
@@ -127,7 +127,7 @@ limitations under the License.
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
- <version>3.0.1</version>
+ <version>3.1.0</version>
</dependency>
<!-- TEST -->
<dependency>
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 5416f18..46d318b 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -128,7 +128,7 @@ limitations under the License.
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
- <version>3.0.1</version>
+ <version>3.1.0</version>
</dependency>
<!-- TEST -->
<dependency>
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
index f68d76b..625579e 100644
--- a/spark-gremlin/pom.xml
+++ b/spark-gremlin/pom.xml
@@ -30,6 +30,11 @@
<name>Apache TinkerPop :: Spark Gremlin</name>
<dependencies>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-core</artifactId>
<version>${project.version}</version>
@@ -55,6 +60,10 @@
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
@@ -104,7 +113,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
- <version>1.6.1</version>
+ <version>2.0.0</version>
<exclusions>
<!-- self conflicts -->
<exclusion>
@@ -210,7 +219,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>2.4.4</version>
+ <version>2.6.5</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 8dd2381..8d32b36 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.spark.process.computer;
-import com.google.common.base.Optional;
+import org.apache.spark.api.java.Optional;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function2;
@@ -65,7 +65,7 @@ public final class SparkExecutor {
public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
return graphRDD.mapPartitionsToPair(partitionIterator -> {
final GraphFilter gFilter = graphFilter.clone();
- return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
+ return IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
}, true);
}
@@ -95,7 +95,7 @@ public final class SparkExecutor {
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
final SparkMessenger<M> messenger = new SparkMessenger<>();
workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
- return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
+ return IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
@@ -133,7 +133,7 @@ public final class SparkExecutor {
/////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////
final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction =
- tuple -> () -> IteratorUtils.concat(
+ tuple -> IteratorUtils.concat(
IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration), vertexProgramConfiguration).getMessageCombiner().orElse(null);
@@ -172,7 +172,7 @@ public final class SparkExecutor {
newViewIncomingRDD
.foreachPartition(partitionIterator -> {
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
- }); // need to complete a task so its BSP and the memory for this iteration is updated
+ }); // need to complete a task so its BSP and the memory for this iteration is updated�
return newViewIncomingRDD;
}
@@ -207,7 +207,7 @@ public final class SparkExecutor {
final Configuration graphComputerConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
- return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
+ return new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
});
if (mapReduce.getMapKeySort().isPresent())
mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
@@ -218,7 +218,7 @@ public final class SparkExecutor {
final Configuration graphComputerConfiguration) {
return mapRDD.mapPartitionsToPair(partitionIterator -> {
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
- return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
+ return new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
});
}
@@ -227,7 +227,7 @@ public final class SparkExecutor {
final Configuration graphComputerConfiguration) {
JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
- return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
+ return new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
});
if (mapReduce.getReduceKeySort().isPresent())
reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 8585e0d..dc22d47 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -85,13 +85,11 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
.filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)
.flatMap(vertexWritable -> {
if (identityTraversal) // g.V.count()-style (identity)
- return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l));
+ return IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l));
else { // add the vertex to head of the traversal
- return () -> { // and iterate it for its results
- final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
+ final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), graphStep, 1l));
return (Step) clone.getEndStep();
- };
}
});
// USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL
@@ -133,14 +131,14 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
result = ((GroupStep) endStep).generateFinalResult(nextRDD.
mapPartitions(partitions -> {
final GroupStep<Object, Object, Object> clone = (GroupStep) endStep.clone();
- return () -> IteratorUtils.map(partitions, clone::projectTraverser);
+ return IteratorUtils.map(partitions, clone::projectTraverser);
}).fold(((GroupStep<Object, Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply));
} else if (endStep instanceof GroupCountStep) {
final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance();
result = nextRDD
.mapPartitions(partitions -> {
final GroupCountStep<Object, Object> clone = (GroupCountStep) endStep.clone();
- return () -> IteratorUtils.map(partitions, clone::projectTraverser);
+ return IteratorUtils.map(partitions, clone::projectTraverser);
})
.fold(((GroupCountStep<Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply);
} else
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 28a4d55..6735fe5 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -24,7 +24,7 @@ import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.python.PythonBroadcast;
-import org.apache.spark.broadcast.HttpBroadcast;
+import org.apache.spark.broadcast.TorrentBroadcast;
import org.apache.spark.network.util.ByteUnit;
import org.apache.spark.scheduler.CompressedMapStatus;
import org.apache.spark.scheduler.HighlyCompressedMapStatus;
@@ -48,24 +48,32 @@ import scala.Tuple3;
import scala.collection.mutable.WrappedArray;
import scala.runtime.BoxedUnit;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class GryoSerializer extends Serializer {
+public final class GryoSerializer extends Serializer implements Serializable {
//private final Option<String> userRegistrator;
private final int bufferSize;
private final int maxBufferSize;
+ private final int poolSize;
+ private final ArrayList<String> ioRegList = new ArrayList<>();
+ private final boolean referenceTracking;
+ private final boolean registrationRequired;
- private final GryoPool gryoPool;
+
+ private transient GryoPool gryoPool;
public GryoSerializer(final SparkConf sparkConfiguration) {
final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
- final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
- final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+ referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+ registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
} else {
@@ -77,9 +85,19 @@ public final class GryoSerializer extends Serializer {
//this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
}
}
- this.gryoPool = GryoPool.build().
- poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
- ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+ poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+ List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList());
+ list.forEach(c -> {
+ ioRegList.add(c.toString());
+ }
+ );
+ }
+
+ private GryoPool createPool(){
+ List<Object> list = new ArrayList<>(ioRegList);
+ return GryoPool.build().
+ poolSize(poolSize).
+ ioRegistries(list).
initializeMapper(builder -> {
try {
builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -91,7 +109,7 @@ public final class GryoSerializer extends Serializer {
.addCustom(CompressedMapStatus.class)
.addCustom(BlockManagerId.class)
.addCustom(HighlyCompressedMapStatus.class, new ExternalizableSerializer()) // externalizable implemented so its okay
- .addCustom(HttpBroadcast.class)
+ .addCustom(TorrentBroadcast.class)
.addCustom(PythonBroadcast.class)
.addCustom(BoxedUnit.class)
.addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer())
@@ -118,6 +136,13 @@ public final class GryoSerializer extends Serializer {
}
public GryoPool getGryoPool() {
+ if (gryoPool == null) {
+ synchronized (this) {
+ if (gryoPool == null) {
+ gryoPool = createPool();
+ }
+ }
+ }
return this.gryoPool;
}