You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/03 14:43:52 UTC

[06/24] tinkerpop git commit: GryoSerializer uses HadoopPools so that gryo pools are not constantly produced (object reused stylie). This have increased the performance of GryoSerializer based jobs to that of the 3.2.x line prior to bumping to Spark 2.0.

GryoSerializer uses HadoopPools so that gryo pools are not constantly produced (object reused stylie). This have increased the performance of GryoSerializer based jobs to that of the 3.2.x line prior to bumping to Spark 2.0.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2321117c
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2321117c
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2321117c

Branch: refs/heads/master
Commit: 2321117c1fb9f5927569d9d61fa28250916b4807
Parents: f0c5a5f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 12 12:22:05 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Nov 29 04:54:21 2016 -0700

----------------------------------------------------------------------
 .../hadoop/structure/io/HadoopPools.java        |  5 +++
 .../spark/structure/io/gryo/GryoSerializer.java | 40 +++++---------------
 2 files changed, 14 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2321117c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index 5074ad5..392e97d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -52,6 +52,11 @@ public final class HadoopPools {
         HadoopPools.initialize(ConfUtil.makeApacheConfiguration(configuration));
     }
 
+    public synchronized static void initialize(final GryoPool gryoPool) {
+        GRYO_POOL = gryoPool;
+        INITIALIZED = true;
+    }
+
     public static GryoPool getGryoPool() {
         if (!INITIALIZED) {
             HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not been initialized, using the default pool");     // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2321117c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 6735fe5..00cb702 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -33,6 +33,7 @@ import org.apache.spark.serializer.SerializerInstance;
 import org.apache.spark.storage.BlockManagerId;
 import org.apache.spark.util.SerializableConfiguration;
 import org.apache.spark.util.collection.CompactBuffer;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
@@ -49,9 +50,7 @@ import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -61,14 +60,9 @@ public final class GryoSerializer extends Serializer implements Serializable {
     //private final Option<String> userRegistrator;
     private final int bufferSize;
     private final int maxBufferSize;
-    private final int poolSize;
-    private final ArrayList<String> ioRegList = new ArrayList<>();
     private final boolean referenceTracking;
     private final boolean registrationRequired;
 
-
-    private transient GryoPool gryoPool;
-
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
         final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
@@ -85,19 +79,10 @@ public final class GryoSerializer extends Serializer implements Serializable {
                 //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
             }
         }
-        poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
-        List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList());
-        list.forEach(c -> {
-                    ioRegList.add(c.toString());
-                }
-        );
-    }
-
-    private GryoPool createPool(){
-        List<Object> list = new ArrayList<>(ioRegList);
-        return GryoPool.build().
-                poolSize(poolSize).
-                ioRegistries(list).
+        // create a GryoPool and store it in static HadoopPools
+        HadoopPools.initialize(GryoPool.build().
+                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
+                ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {
                         builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -122,13 +107,13 @@ public final class GryoSerializer extends Serializer implements Serializable {
                                 .addCustom(SerializableConfiguration.class, new JavaSerializer())
                                 .addCustom(VertexWritable.class, new VertexWritableSerializer())
                                 .addCustom(ObjectWritable.class, new ObjectWritableSerializer())
-                                .referenceTracking(referenceTracking)
-                                .registrationRequired(registrationRequired);
+                                .referenceTracking(this.referenceTracking)
+                                .registrationRequired(this.registrationRequired);
                         // add these as we find ClassNotFoundExceptions
                     } catch (final ClassNotFoundException e) {
                         throw new IllegalStateException(e);
                     }
-                }).create();
+                }).create());
     }
 
     public Output newOutput() {
@@ -136,14 +121,7 @@ public final class GryoSerializer extends Serializer implements Serializable {
     }
 
     public GryoPool getGryoPool() {
-        if (gryoPool == null) {
-            synchronized (this) {
-                if (gryoPool == null) {
-                    gryoPool = createPool();
-                }
-            }
-        }
-        return this.gryoPool;
+        return HadoopPools.getGryoPool();
     }
 
     @Override