You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/28 23:54:24 UTC

incubator-tinkerpop git commit: GryoPool now uses a Builder pattern. The old constructors are still there. I don't know if GryoPool is considered a 'public facing class'... if not, lets kill the GryoPool constructors. Else, deprecate. HadoopSparkGraphPro

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/spark-gryo-tp31 a7294ea0a -> b32263e00


GryoPool now uses a Builder pattern. The old constructors are still there. I don't know if GryoPool is considered a 'public facing class'... if not, lets kill the GryoPool constructors. Else, deprecate. HadoopSparkGraphProvider has kryo registration set to true so we can fish out any unregistered Spark classes. So far, full test suite passes so our current registrations are good.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b32263e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b32263e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b32263e0

Branch: refs/heads/spark-gryo-tp31
Commit: b32263e00e4a6ba281151fbb640eaa4105cbe92c
Parents: a7294ea
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 15:54:08 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 15:54:08 2015 -0600

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoPool.java     | 80 +++++++++++++++-----
 .../spark/structure/io/gryo/GryoSerializer.java | 80 ++++++++++----------
 .../computer/HadoopSparkGraphProvider.java      |  4 +-
 3 files changed, 105 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b32263e0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 275009f..97884bb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -48,20 +48,15 @@ public final class GryoPool {
     private Queue<GryoWriter> gryoWriters;
     private final GryoMapper mapper;
 
-    public GryoPool(final Configuration conf, final Consumer<GryoMapper.Builder> builderConsumer, final Consumer<Kryo> kryoConsumer) {
-        final GryoMapper.Builder mapperBuilder = GryoMapper.build();
-        tryCreateIoRegistry(conf.getList(CONFIG_IO_REGISTRY, Collections.<IoRegistry>emptyList())).forEach(mapperBuilder::addRegistry);
-        builderConsumer.accept(mapperBuilder);
-        // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
-        this.mapper = mapperBuilder.create();
-        this.createPool(conf.getInt(CONFIG_IO_GRYO_POOL_SIZE, 256), Type.READER_WRITER, this.mapper);
-        for (final GryoReader reader : this.gryoReaders) {
-            kryoConsumer.accept(reader.getKryo());
-        }
-        for (final GryoWriter writer : this.gryoWriters) {
-            kryoConsumer.accept(writer.getKryo());
-        }
+    public static GryoPool.Builder build() {
+        return new GryoPool.Builder();
+    }
 
+    /**
+     * Used by {@code GryoPool.Builder}.
+     */
+    private GryoPool() {
+        this.mapper = null;
     }
 
     /**
@@ -117,10 +112,6 @@ public final class GryoPool {
         }
     }
 
-    public GryoMapper getMapper() {
-        return this.mapper;
-    }
-
     public GryoReader takeReader() {
         final GryoReader reader = this.gryoReaders.poll();
         return null == reader ? GryoReader.build().mapper(mapper).create() : reader;
@@ -176,4 +167,59 @@ public final class GryoPool {
         });
         return registries;
     }
+
+    ////
+
+    public static class Builder {
+
+        private int poolSize = 256;
+        private Type type = Type.READER_WRITER;
+        private Consumer<GryoMapper.Builder> gryoMapperConsumer = null;
+        private Consumer<Kryo> kryoConsumer = null;
+        private Configuration configuration = null;
+
+        public Builder configuration(final Configuration configuration) {
+            this.configuration = configuration;
+            return this;
+        }
+
+        public Builder poolSize(int poolSize) {
+            this.poolSize = poolSize;
+            return this;
+        }
+
+        public Builder type(final Type type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder initializeMapper(final Consumer<GryoMapper.Builder> gryoMapperConsumer) {
+            this.gryoMapperConsumer = gryoMapperConsumer;
+            return this;
+        }
+
+        public Builder initializeKryo(final Consumer<Kryo> kryoConsumer) {
+            this.kryoConsumer = kryoConsumer;
+            return this;
+        }
+
+        public GryoPool create() {
+            final GryoMapper.Builder mapper = GryoMapper.build();
+            final GryoPool gryoPool = new GryoPool();
+            if (null != this.configuration)
+                tryCreateIoRegistry(this.configuration.getList(CONFIG_IO_REGISTRY, Collections.emptyList())).forEach(mapper::addRegistry);
+            if (null != this.gryoMapperConsumer)
+                this.gryoMapperConsumer.accept(mapper);
+            gryoPool.createPool(this.poolSize, this.type, mapper.create());
+            if (null != this.kryoConsumer) {
+                for (final GryoReader reader : gryoPool.gryoReaders) {
+                    kryoConsumer.accept(reader.getKryo());
+                }
+                for (final GryoWriter writer : gryoPool.gryoWriters) {
+                    kryoConsumer.accept(writer.getKryo());
+                }
+            }
+            return gryoPool;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b32263e0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index ee16126..7b12807 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -47,58 +47,58 @@ import scala.runtime.BoxedUnit;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class GryoSerializer extends Serializer {
-    private final boolean referenceTracking;
-    private final boolean registrationRequired;
+
     //private final Option<String> userRegistrator;
-    private final long bufferSizeKb;
     private final int bufferSize;
-    private final int maxBufferSizeMb;
     private final int maxBufferSize;
 
     private final GryoPool gryoPool;
 
     public GryoSerializer(final SparkConf sparkConfiguration) {
-        this.bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
-        if (this.bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
-            throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + this.bufferSizeKb + " mb.");
+        final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
+        final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
+        final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+        final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+        if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
+            throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
         } else {
-            this.bufferSize = (int) ByteUnit.KiB.toBytes(this.bufferSizeKb);
-            this.maxBufferSizeMb = (int) sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
-            if (this.maxBufferSizeMb >= ByteUnit.GiB.toMiB(2L)) {
-                throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than 2048 mb, got: " + this.maxBufferSizeMb + " mb.");
+            this.bufferSize = (int) ByteUnit.KiB.toBytes(bufferSizeKb);
+            if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2L)) {
+                throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than 2048 mb, got: " + maxBufferSizeMb + " mb.");
             } else {
-                this.maxBufferSize = (int) ByteUnit.MiB.toBytes(this.maxBufferSizeMb);
-                this.referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
-                this.registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+                this.maxBufferSize = (int) ByteUnit.MiB.toBytes(maxBufferSizeMb);
                 //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
-
             }
         }
-        this.gryoPool = new GryoPool(makeApacheConfiguration(sparkConfiguration), builder -> {
-            try {
-                builder.
-                        addCustom(SerializableWritable.class, new JavaSerializer()).
-                        addCustom(Tuple2.class, new JavaSerializer()).
-                        addCustom(CompressedMapStatus.class, new JavaSerializer()).
-                        addCustom(HttpBroadcast.class, new JavaSerializer()).
-                        addCustom(PythonBroadcast.class, new JavaSerializer()).
-                        addCustom(BoxedUnit.class, new JavaSerializer()).
-                        addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
-                        addCustom(MessagePayload.class, new JavaSerializer()).
-                        addCustom(ViewIncomingPayload.class, new JavaSerializer()).
-                        addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
-                        addCustom(ViewPayload.class, new JavaSerializer()).
-                        addCustom(SerializableConfiguration.class, new JavaSerializer()).
-                        addCustom(VertexWritable.class, new JavaSerializer()).
-                        addCustom(ObjectWritable.class, new JavaSerializer());
-            } catch (final ClassNotFoundException e) {
-                throw new IllegalStateException(e);
-            }
-        }, kryo -> {
-            kryo.setRegistrationRequired(this.registrationRequired);
-            kryo.setReferences(this.referenceTracking);
-        });
-
+        this.gryoPool = GryoPool.build().
+                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
+                configuration(makeApacheConfiguration(sparkConfiguration)).
+                initializeMapper(builder -> {
+                    try {
+                        builder.
+                                addCustom(SerializableWritable.class, new JavaSerializer()).
+                                addCustom(Tuple2.class, new JavaSerializer()).
+                                addCustom(CompressedMapStatus.class, new JavaSerializer()).
+                                addCustom(HttpBroadcast.class, new JavaSerializer()).
+                                addCustom(PythonBroadcast.class, new JavaSerializer()).
+                                addCustom(BoxedUnit.class, new JavaSerializer()).
+                                addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
+                                addCustom(MessagePayload.class, new JavaSerializer()).
+                                addCustom(ViewIncomingPayload.class, new JavaSerializer()).
+                                addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
+                                addCustom(ViewPayload.class, new JavaSerializer()).
+                                addCustom(SerializableConfiguration.class, new JavaSerializer()).
+                                addCustom(VertexWritable.class, new JavaSerializer()).
+                                addCustom(ObjectWritable.class, new JavaSerializer());
+                                // add these as we find ClassNotFoundExceptions
+                    } catch (final ClassNotFoundException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }).
+                initializeKryo(kryo -> {
+                    kryo.setRegistrationRequired(registrationRequired);
+                    kryo.setReferences(referenceTracking);
+                }).create();
     }
 
     public Output newOutput() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b32263e0/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
index 916a3d7..a82b83f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -115,8 +115,8 @@ public final class HadoopSparkGraphProvider extends AbstractGraphProvider {
             /// spark configuration
             put("spark.master", "local[4]");
             // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-            put("spark.serializer","org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-            // put("spark.kryo.registrationRequired",true);
+            put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+            put("spark.kryo.registrationRequired", true);
         }};
     }