You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/09/12 18:22:15 UTC

tinkerpop git commit: GryoSerializer uses HadoopPools so that gryo pools are not constantly produced (object reused stylie). This have increased the performance of GryoSerializer based jobs to that of the 3.2.x line prior to bumping to Spark 2.0.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1389 deb26060a -> 64e2c7b7d


GryoSerializer uses HadoopPools so that gryo pools are not constantly produced (object reused stylie). This have increased the performance of GryoSerializer based jobs to that of the 3.2.x line prior to bumping to Spark 2.0.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/64e2c7b7
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/64e2c7b7
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/64e2c7b7

Branch: refs/heads/TINKERPOP-1389
Commit: 64e2c7b7dbd2a92393733f257094953bc434e775
Parents: deb2606
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 12 12:22:05 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 12 12:22:05 2016 -0600

----------------------------------------------------------------------
 .../hadoop/structure/io/HadoopPools.java        |  5 +++
 .../spark/structure/io/gryo/GryoSerializer.java | 40 +++++---------------
 2 files changed, 14 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/64e2c7b7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index 5074ad5..392e97d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -52,6 +52,11 @@ public final class HadoopPools {
         HadoopPools.initialize(ConfUtil.makeApacheConfiguration(configuration));
     }
 
+    public synchronized static void initialize(final GryoPool gryoPool) {
+        GRYO_POOL = gryoPool;
+        INITIALIZED = true;
+    }
+
     public static GryoPool getGryoPool() {
         if (!INITIALIZED) {
             HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not been initialized, using the default pool");     // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/64e2c7b7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 6735fe5..00cb702 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -33,6 +33,7 @@ import org.apache.spark.serializer.SerializerInstance;
 import org.apache.spark.storage.BlockManagerId;
 import org.apache.spark.util.SerializableConfiguration;
 import org.apache.spark.util.collection.CompactBuffer;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
@@ -49,9 +50,7 @@ import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -61,14 +60,9 @@ public final class GryoSerializer extends Serializer implements Serializable {
     //private final Option<String> userRegistrator;
     private final int bufferSize;
     private final int maxBufferSize;
-    private final int poolSize;
-    private final ArrayList<String> ioRegList = new ArrayList<>();
     private final boolean referenceTracking;
     private final boolean registrationRequired;
 
-
-    private transient GryoPool gryoPool;
-
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
         final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
@@ -85,19 +79,10 @@ public final class GryoSerializer extends Serializer implements Serializable {
                 //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
             }
         }
-        poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
-        List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList());
-        list.forEach(c -> {
-                    ioRegList.add(c.toString());
-                }
-        );
-    }
-
-    private GryoPool createPool(){
-        List<Object> list = new ArrayList<>(ioRegList);
-        return GryoPool.build().
-                poolSize(poolSize).
-                ioRegistries(list).
+        // create a GryoPool and store it in static HadoopPools
+        HadoopPools.initialize(GryoPool.build().
+                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
+                ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {
                         builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -122,13 +107,13 @@ public final class GryoSerializer extends Serializer implements Serializable {
                                 .addCustom(SerializableConfiguration.class, new JavaSerializer())
                                 .addCustom(VertexWritable.class, new VertexWritableSerializer())
                                 .addCustom(ObjectWritable.class, new ObjectWritableSerializer())
-                                .referenceTracking(referenceTracking)
-                                .registrationRequired(registrationRequired);
+                                .referenceTracking(this.referenceTracking)
+                                .registrationRequired(this.registrationRequired);
                         // add these as we find ClassNotFoundExceptions
                     } catch (final ClassNotFoundException e) {
                         throw new IllegalStateException(e);
                     }
-                }).create();
+                }).create());
     }
 
     public Output newOutput() {
@@ -136,14 +121,7 @@ public final class GryoSerializer extends Serializer implements Serializable {
     }
 
     public GryoPool getGryoPool() {
-        if (gryoPool == null) {
-            synchronized (this) {
-                if (gryoPool == null) {
-                    gryoPool = createPool();
-                }
-            }
-        }
-        return this.gryoPool;
+        return HadoopPools.getGryoPool();
     }
 
     @Override