You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/09/12 17:15:53 UTC

[2/4] tinkerpop git commit: TINKERPOP-1426, GryoSerializer should implement Java serialization interface

TINKERPOP-1426, GryoSerializer should implement Java serialization interface


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/28b68684
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/28b68684
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/28b68684

Branch: refs/heads/TINKERPOP-1389
Commit: 28b686841f00201617a1a6fdf38b2ee90e5c8a44
Parents: 1e0d7de
Author: yucx <yu...@cn.ibm.com>
Authored: Fri Sep 2 02:00:41 2016 -0700
Committer: yucx <yu...@cn.ibm.com>
Committed: Fri Sep 2 03:33:12 2016 -0700

----------------------------------------------------------------------
 .../spark/structure/io/gryo/GryoSerializer.java | 39 ++++++++++++++++----
 1 file changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28b68684/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 28a4d55..e64cda2 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -48,24 +48,32 @@ import scala.Tuple3;
 import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GryoSerializer extends Serializer {
+public final class GryoSerializer extends Serializer implements Serializable {
 
     //private final Option<String> userRegistrator;
     private final int bufferSize;
     private final int maxBufferSize;
+    private final int poolSize;
+    private final ArrayList<String> ioRegList = new ArrayList<>();
+    private final boolean referenceTracking;
+    private final boolean registrationRequired;
 
-    private final GryoPool gryoPool;
+
+    private transient GryoPool gryoPool;
 
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
         final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
-        final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
-        final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+        referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+        registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
         if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
             throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
         } else {
@@ -77,9 +85,19 @@ public final class GryoSerializer extends Serializer {
                 //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
             }
         }
-        this.gryoPool = GryoPool.build().
-                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
-                ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+        poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+        List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList());
+        list.forEach(c -> {
+                    ioRegList.add(c.toString());
+                }
+        );
+    }
+
+    private GryoPool createPool(){
+        List<Object> list = new ArrayList<>(ioRegList);
+        return GryoPool.build().
+                poolSize(poolSize).
+                ioRegistries(list).
                 initializeMapper(builder -> {
                     try {
                         builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -118,6 +136,13 @@ public final class GryoSerializer extends Serializer {
     }
 
     public GryoPool getGryoPool() {
+        if (gryoPool == null) {
+            synchronized (this) {
+                if (gryoPool == null) {
+                    gryoPool = createPool();
+                }
+            }
+        }
         return this.gryoPool;
     }