You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/26 14:18:51 UTC

[19/29] tinkerpop git commit: TINKERPOP-1389 Support Spark 2.0

TINKERPOP-1389 Support Spark 2.0


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/db3dfcb4
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/db3dfcb4
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/db3dfcb4

Branch: refs/heads/TINKERPOP-1389
Commit: db3dfcb4aff776a8c10db393ad5d1811607eba30
Parents: 1484c8d
Author: yucx <yu...@cn.ibm.com>
Authored: Thu Sep 1 23:11:58 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Oct 26 08:13:53 2016 -0600

----------------------------------------------------------------------
 giraph-gremlin/pom.xml                          |  2 +-
 hadoop-gremlin/pom.xml                          |  2 +-
 spark-gremlin/pom.xml                           | 13 +++++-
 .../spark/process/computer/SparkExecutor.java   | 16 ++++----
 .../SparkStarBarrierInterceptor.java            | 10 ++---
 .../spark/structure/io/gryo/GryoSerializer.java | 43 ++++++++++++++++----
 6 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/giraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-gremlin/pom.xml b/giraph-gremlin/pom.xml
index 102f3a9..25d96ec 100644
--- a/giraph-gremlin/pom.xml
+++ b/giraph-gremlin/pom.xml
@@ -127,7 +127,7 @@ limitations under the License.
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
-            <version>3.0.1</version>
+            <version>3.1.0</version>
         </dependency>
         <!-- TEST -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 5416f18..46d318b 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -128,7 +128,7 @@ limitations under the License.
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
-            <version>3.0.1</version>
+            <version>3.1.0</version>
         </dependency>
         <!-- TEST -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
index f68d76b..625579e 100644
--- a/spark-gremlin/pom.xml
+++ b/spark-gremlin/pom.xml
@@ -30,6 +30,11 @@
     <name>Apache TinkerPop :: Spark Gremlin</name>
     <dependencies>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>14.0.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.tinkerpop</groupId>
             <artifactId>gremlin-core</artifactId>
             <version>${project.version}</version>
@@ -55,6 +60,10 @@
                     <artifactId>servlet-api</artifactId>
                 </exclusion>
                 <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>javax.servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
                     <groupId>com.sun.jersey</groupId>
                     <artifactId>jersey-core</artifactId>
                 </exclusion>
@@ -104,7 +113,7 @@
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
-            <version>1.6.1</version>
+            <version>2.0.0</version>
             <exclusions>
                 <!-- self conflicts -->
                 <exclusion>
@@ -210,7 +219,7 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
-            <version>2.4.4</version>
+            <version>2.6.5</version>
         </dependency>
         <dependency>
             <groupId>commons-lang</groupId>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 8dd2381..8d32b36 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.spark.process.computer;
 
-import com.google.common.base.Optional;
+import org.apache.spark.api.java.Optional;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function2;
@@ -65,7 +65,7 @@ public final class SparkExecutor {
     public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
         return graphRDD.mapPartitionsToPair(partitionIterator -> {
             final GraphFilter gFilter = graphFilter.clone();
-            return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
+            return IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
         }, true);
     }
 
@@ -95,7 +95,7 @@ public final class SparkExecutor {
                     final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
-                    return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
+                    return IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
                         final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
@@ -133,7 +133,7 @@ public final class SparkExecutor {
         /////////////////////////////////////////////////////////////
         /////////////////////////////////////////////////////////////
         final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction =
-                tuple -> () -> IteratorUtils.concat(
+                tuple -> IteratorUtils.concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration), vertexProgramConfiguration).getMessageCombiner().orElse(null);
@@ -172,7 +172,7 @@ public final class SparkExecutor {
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
                     KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-                }); // need to complete a task so its BSP and the memory for this iteration is updated
+                }); // need to complete a task so its BSP and the memory for this iteration is updated�
         return newViewIncomingRDD;
     }
 
@@ -207,7 +207,7 @@ public final class SparkExecutor {
             final Configuration graphComputerConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-            return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
+            return new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getMapKeySort().isPresent())
             mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
@@ -218,7 +218,7 @@ public final class SparkExecutor {
                                                                     final Configuration graphComputerConfiguration) {
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
             KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-            return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
+            return new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
         });
     }
 
@@ -227,7 +227,7 @@ public final class SparkExecutor {
             final Configuration graphComputerConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-            return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
+            return new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getReduceKeySort().isPresent())
             reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 8585e0d..dc22d47 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -85,13 +85,11 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
                 .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)
                 .flatMap(vertexWritable -> {
                     if (identityTraversal)                          // g.V.count()-style (identity)
-                        return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l));
+                        return IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l));
                     else {                                          // add the vertex to head of the traversal
-                        return () -> {                              // and iterate it for its results
-                            final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
+                        final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
                             clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), graphStep, 1l));
                             return (Step) clone.getEndStep();
-                        };
                     }
                 });
         // USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL
@@ -133,14 +131,14 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
             result = ((GroupStep) endStep).generateFinalResult(nextRDD.
                     mapPartitions(partitions -> {
                         final GroupStep<Object, Object, Object> clone = (GroupStep) endStep.clone();
-                        return () -> IteratorUtils.map(partitions, clone::projectTraverser);
+                        return IteratorUtils.map(partitions, clone::projectTraverser);
                     }).fold(((GroupStep<Object, Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply));
         } else if (endStep instanceof GroupCountStep) {
             final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance();
             result = nextRDD
                     .mapPartitions(partitions -> {
                         final GroupCountStep<Object, Object> clone = (GroupCountStep) endStep.clone();
-                        return () -> IteratorUtils.map(partitions, clone::projectTraverser);
+                        return IteratorUtils.map(partitions, clone::projectTraverser);
                     })
                     .fold(((GroupCountStep<Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply);
         } else

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/db3dfcb4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 28a4d55..6735fe5 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -24,7 +24,7 @@ import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.python.PythonBroadcast;
-import org.apache.spark.broadcast.HttpBroadcast;
+import org.apache.spark.broadcast.TorrentBroadcast;
 import org.apache.spark.network.util.ByteUnit;
 import org.apache.spark.scheduler.CompressedMapStatus;
 import org.apache.spark.scheduler.HighlyCompressedMapStatus;
@@ -48,24 +48,32 @@ import scala.Tuple3;
 import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GryoSerializer extends Serializer {
+public final class GryoSerializer extends Serializer implements Serializable {
 
     //private final Option<String> userRegistrator;
     private final int bufferSize;
     private final int maxBufferSize;
+    private final int poolSize;
+    private final ArrayList<String> ioRegList = new ArrayList<>();
+    private final boolean referenceTracking;
+    private final boolean registrationRequired;
 
-    private final GryoPool gryoPool;
+
+    private transient GryoPool gryoPool;
 
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
         final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
-        final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
-        final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+        referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+        registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
         if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
             throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
         } else {
@@ -77,9 +85,19 @@ public final class GryoSerializer extends Serializer {
                 //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
             }
         }
-        this.gryoPool = GryoPool.build().
-                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
-                ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+        poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+        List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList());
+        list.forEach(c -> {
+                    ioRegList.add(c.toString());
+                }
+        );
+    }
+
+    private GryoPool createPool(){
+        List<Object> list = new ArrayList<>(ioRegList);
+        return GryoPool.build().
+                poolSize(poolSize).
+                ioRegistries(list).
                 initializeMapper(builder -> {
                     try {
                         builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -91,7 +109,7 @@ public final class GryoSerializer extends Serializer {
                                 .addCustom(CompressedMapStatus.class)
                                 .addCustom(BlockManagerId.class)
                                 .addCustom(HighlyCompressedMapStatus.class, new ExternalizableSerializer())   // externalizable implemented so its okay
-                                .addCustom(HttpBroadcast.class)
+                                .addCustom(TorrentBroadcast.class)
                                 .addCustom(PythonBroadcast.class)
                                 .addCustom(BoxedUnit.class)
                                 .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer())
@@ -118,6 +136,13 @@ public final class GryoSerializer extends Serializer {
     }
 
     public GryoPool getGryoPool() {
+        if (gryoPool == null) {
+            synchronized (this) {
+                if (gryoPool == null) {
+                    gryoPool = createPool();
+                }
+            }
+        }
         return this.gryoPool;
     }