You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/29 19:49:30 UTC

[03/14] incubator-tinkerpop git commit: GryoPool constructor added, GryoReader/Writer now have getKryo() public methods. GryoSerializer now exists as a Spark serializer which will connect with the Graph and get its registered serializers. Updated conf/pr

GryoPool constructor added, GryoReader/Writer now have getKryo() public methods. GryoSerializer now exists as a Spark serializer which will connect with the Graph and get its registered serializers. Updated conf/properties.files with GryoSerializer as the new default serializer instead of Sparks KryoSerializer. Uncommented GroupCountTest test which failed due to serialization issue in 3.0.x.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fde13ed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fde13ed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fde13ed5

Branch: refs/heads/master
Commit: fde13ed5d9db8c18bd9e0d8eb4541755aa181697
Parents: a8b1439
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 13:03:11 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 13:03:11 2015 -0600

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoPool.java     |  36 +++++-
 .../gremlin/structure/io/gryo/GryoReader.java   |  66 +++++-----
 .../gremlin/structure/io/gryo/GryoWriter.java   |   4 +
 .../step/sideEffect/GroupCountTest.java         |   1 -
 hadoop-gremlin/conf/hadoop-gryo.properties      |   2 +-
 hadoop-gremlin/conf/hadoop-script.properties    |   2 +-
 .../io/gryo/GryoDeserializationStream.groovy    |  63 ++++++++++
 .../spark/process/computer/io/InputRDD.java     |   2 +-
 .../io/gryo/GryoDeserializationStream.java      |  55 --------
 .../io/gryo/GryoSerializationStream.java        |  69 ----------
 .../computer/io/gryo/GryoSerializer.java        |  62 ---------
 .../io/gryo/GryoSerializerInstance.java         |  77 ------------
 .../io/gryo/GryoSerializationStream.java        |  59 +++++++++
 .../spark/structure/io/gryo/GryoSerializer.java | 125 +++++++++++++++++++
 .../io/gryo/GryoSerializerInstance.java         |  87 +++++++++++++
 .../computer/HadoopSparkGraphProvider.java      |   2 +-
 16 files changed, 407 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 426997e..275009f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -47,6 +48,22 @@ public final class GryoPool {
     private Queue<GryoWriter> gryoWriters;
     private final GryoMapper mapper;
 
+    public GryoPool(final Configuration conf, final Consumer<GryoMapper.Builder> builderConsumer, final Consumer<Kryo> kryoConsumer) {
+        final GryoMapper.Builder mapperBuilder = GryoMapper.build();
+        tryCreateIoRegistry(conf.getList(CONFIG_IO_REGISTRY, Collections.<IoRegistry>emptyList())).forEach(mapperBuilder::addRegistry);
+        builderConsumer.accept(mapperBuilder);
+        // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
+        this.mapper = mapperBuilder.create();
+        this.createPool(conf.getInt(CONFIG_IO_GRYO_POOL_SIZE, 256), Type.READER_WRITER, this.mapper);
+        for (final GryoReader reader : this.gryoReaders) {
+            kryoConsumer.accept(reader.getKryo());
+        }
+        for (final GryoWriter writer : this.gryoWriters) {
+            kryoConsumer.accept(writer.getKryo());
+        }
+
+    }
+
     /**
      * Create a pool of readers and writers from a {@code Configuration} object.  There are two configuration keys
      * expected: "gremlin.io.registry" which defines comma separated list of the fully qualified class names of
@@ -73,30 +90,37 @@ public final class GryoPool {
      * Create a pool of a readers, writers or both of the specified size with an optional {@link IoRegistry} object
      * which would allow custom serializers to be registered to the pool.
      *
-     * @param poolSize initial size of the pool.
-     * @param type the type of pool.
+     * @param poolSize   initial size of the pool.
+     * @param type       the type of pool.
      * @param registries a list of registries to assign to each {@link GryoReader} and {@link GryoWriter} instances.
      */
     public GryoPool(final int poolSize, final Type type, final List<IoRegistry> registries) {
         final GryoMapper.Builder mapperBuilder = GryoMapper.build();
         registries.forEach(mapperBuilder::addRegistry);
-
         // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
-        mapper = mapperBuilder.create();
+        this.mapper = mapperBuilder.create();
+        createPool(poolSize, type, mapper);
+    }
+
+    private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
         if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
             this.gryoReaders = new LinkedBlockingQueue<>(poolSize);
             for (int i = 0; i < poolSize; i++) {
-                this.gryoReaders.add(GryoReader.build().mapper(mapper).create());
+                this.gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
             }
         }
         if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
             this.gryoWriters = new LinkedBlockingQueue<>(poolSize);
             for (int i = 0; i < poolSize; i++) {
-                this.gryoWriters.add(GryoWriter.build().mapper(mapper).create());
+                this.gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
             }
         }
     }
 
+    public GryoMapper getMapper() {
+        return this.mapper;
+    }
+
     public GryoReader takeReader() {
         final GryoReader reader = this.gryoReaders.poll();
         return null == reader ? GryoReader.build().mapper(mapper).create() : reader;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
index 234a04e..ebc0ebc 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
@@ -18,24 +18,23 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
 import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.Host;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 
 import java.io.IOException;
@@ -68,12 +67,16 @@ public final class GryoReader implements GraphReader {
         this.batchSize = batchSize;
     }
 
+    public Kryo getKryo() {
+        return this.kryo;
+    }
+
     /**
      * Read data into a {@link Graph} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
-     *                    {@link GraphWriter#writeGraph(OutputStream, Graph)}.
+     * @param inputStream    a stream containing an entire graph of vertices and edges as defined by the accompanying
+     *                       {@link GraphWriter#writeGraph(OutputStream, Graph)}.
      * @param graphToWriteTo the graph to write to when reading from the stream.
      * @throws IOException
      */
@@ -81,7 +84,7 @@ public final class GryoReader implements GraphReader {
     public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
         // dual pass - create all vertices and store to cache the ids.  then create edges.  as long as we don't
         // have vertex labels in the output we can't do this single pass
-        final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
+        final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
         final AtomicLong counter = new AtomicLong(0);
 
         final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
@@ -112,11 +115,11 @@ public final class GryoReader implements GraphReader {
      * Read {@link Vertex} objects from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
-     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
-     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
+     * @param inputStream                a stream containing at least one {@link Vertex} as defined by the accompanying
+     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
+     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
+     * @param vertexAttachMethod         a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
+     * @param edgeAttachMethod           a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
      */
     @Override
@@ -131,8 +134,8 @@ public final class GryoReader implements GraphReader {
      * Read a {@link Vertex}  from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing at least a single vertex as defined by the accompanying
-     *                    {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
+     * @param inputStream        a stream containing at least a single vertex as defined by the accompanying
+     *                           {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
      */
     @Override
@@ -144,10 +147,10 @@ public final class GryoReader implements GraphReader {
      * Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
-     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
-     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
+     * @param inputStream                a stream containing at least one {@link Vertex} as defined by the accompanying
+     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
+     * @param vertexAttachMethod         a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
+     * @param edgeAttachMethod           a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
      */
     @Override
@@ -163,8 +166,8 @@ public final class GryoReader implements GraphReader {
      * Read an {@link Edge} from output generated by {@link GryoWriter#writeEdge(OutputStream, Edge)} or via
      * an {@link Edge} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
      *
-     * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
-     *                    {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
+     * @param inputStream      a stream containing at least one {@link Edge} as defined by the accompanying
+     *                         {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      */
     @Override
@@ -180,14 +183,14 @@ public final class GryoReader implements GraphReader {
      * {@link GryoWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
      * to {@link GryoWriter#writeObject(OutputStream, Object)}.
      *
-     * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
-     *                    {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
+     * @param inputStream                a stream containing at least one {@link VertexProperty} as written by the accompanying
+     *                                   {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
      * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
      *                                   {@link Host} object.
      */
     @Override
-    public VertexProperty readVertexProperty (final InputStream inputStream,
-                                              final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
+    public VertexProperty readVertexProperty(final InputStream inputStream,
+                                             final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
         final Input input = new Input(inputStream);
         readHeader(input);
         final Attachable<VertexProperty> attachable = kryo.readObject(input, DetachedVertexProperty.class);
@@ -198,8 +201,8 @@ public final class GryoReader implements GraphReader {
      * Read a {@link Property} from output generated by  {@link GryoWriter#writeProperty(OutputStream, Property)} or
      * via an {@link Property} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
      *
-     * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
-     *                    {@link GraphWriter#writeProperty(OutputStream, Property)} method.
+     * @param inputStream          a stream containing at least one {@link Property} as written by the accompanying
+     *                             {@link GraphWriter#writeProperty(OutputStream, Property)} method.
      * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
      */
     @Override
@@ -215,7 +218,7 @@ public final class GryoReader implements GraphReader {
      * {@inheritDoc}
      */
     @Override
-    public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException{
+    public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
         return clazz.cast(this.kryo.readClassAndObject(new Input(inputStream)));
     }
 
@@ -230,7 +233,8 @@ public final class GryoReader implements GraphReader {
         kryo.readClassAndObject(input);
 
         final Vertex v = vertexMaker.apply(starGraph.getStarVertex());
-        if (edgeMaker != null) starGraph.getStarVertex().edges(d).forEachRemaining(e -> edgeMaker.apply((Attachable<Edge>) e));
+        if (edgeMaker != null)
+            starGraph.getStarVertex().edges(d).forEachRemaining(e -> edgeMaker.apply((Attachable<Edge>) e));
         return v;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
index d98b8c2..8ca8e51 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
@@ -51,6 +51,10 @@ public final class GryoWriter implements GraphWriter {
         this.kryo = gryoMapper.createMapper();
     }
 
+    public Kryo getKryo() {
+        return this.kryo;
+    }
+
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
index 3bf232c..616daa2 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
@@ -84,7 +84,6 @@ public abstract class GroupCountTest extends AbstractGremlinProcessTest {
 
     @Test
     @LoadGraphWith(MODERN)
-    @Ignore // TODO: fix Spark integration
     public void g_V_outXcreatedX_groupCountXxX_capXxX() {
         final Traversal<Vertex, Map<Vertex, Long>> traversal = get_g_V_outXcreatedX_groupCountXxX_capXxX();
         final Object lopId = convertToVertexId("lop");

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/hadoop-gremlin/conf/hadoop-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-gryo.properties b/hadoop-gremlin/conf/hadoop-gryo.properties
index e92d517..75fb11f 100644
--- a/hadoop-gremlin/conf/hadoop-gryo.properties
+++ b/hadoop-gremlin/conf/hadoop-gryo.properties
@@ -27,7 +27,7 @@ gremlin.hadoop.outputLocation=output
 ####################################
 spark.master=local[4]
 spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
 # spark.kryo.registrationRequired=true
 # spark.storage.memoryFraction=0.2
 # spark.eventLog.enabled=true

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/hadoop-gremlin/conf/hadoop-script.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-script.properties b/hadoop-gremlin/conf/hadoop-script.properties
index 1099394..3b41598 100644
--- a/hadoop-gremlin/conf/hadoop-script.properties
+++ b/hadoop-gremlin/conf/hadoop-script.properties
@@ -28,7 +28,7 @@ gremlin.hadoop.outputLocation=output
 ####################################
 spark.master=local[4]
 spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
 # spark.kryo.registrationRequired=true
 # spark.storage.memoryFraction=0.2
 # spark.eventLog.enabled=true

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
new file mode 100644
index 0000000..6c6296d
--- /dev/null
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo
+
+import org.apache.spark.serializer.DeserializationStream
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader
+import org.apache.tinkerpop.shaded.kryo.KryoException
+import org.apache.tinkerpop.shaded.kryo.io.Input
+import scala.reflect.ClassTag
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoDeserializationStream extends DeserializationStream {
+
+    private final Input input;
+    private final GryoSerializerInstance gryoSerializer;
+
+    public GryoDeserializationStream(final GryoSerializerInstance gryoSerializer, final InputStream inputStream) {
+        this.gryoSerializer = gryoSerializer;
+        this.input = new Input(inputStream);
+    }
+
+    @Override
+    public <T> T readObject(final ClassTag<T> classTag) {
+        try {
+            final GryoReader gryoReader = this.gryoSerializer.getGryoPool().takeReader();
+            final T t = (T) gryoReader.getKryo().readClassAndObject(this.input);
+            this.gryoSerializer.getGryoPool().offerReader(gryoReader);
+            return t;
+        } catch (final Throwable e) {
+            if (e instanceof KryoException) {
+                final KryoException kryoException = (KryoException) e;
+                if (kryoException.getMessage().toLowerCase().contains("buffer underflow")) {
+                    throw new EOFException();
+                }
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void close() {
+        this.input.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
index 19d79a8..291fcd3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
@@ -33,7 +33,7 @@ public interface InputRDD {
 
     /**
      * Read the graphRDD from the underlying graph system.
-     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
+     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
      * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
      * @return an adjacency list representation of the underlying graph system.
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
deleted file mode 100644
index 1d8039a..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.DeserializationStream;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import scala.reflect.ClassTag;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoDeserializationStream extends DeserializationStream {
-
-    private final InputStream inputStream;
-    private final GryoSerializerInstance serializer;
-
-    public GryoDeserializationStream(final GryoSerializerInstance serializer, final InputStream inputStream) {
-        this.serializer = serializer;
-        this.inputStream = inputStream;
-    }
-
-    @Override
-    public <T> T readObject(final ClassTag<T> classTag) {
-        return (T) this.serializer.getKryo().readClassAndObject(new Input(this.inputStream));
-    }
-
-    @Override
-    public void close() {
-        try {
-            this.inputStream.close();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
deleted file mode 100644
index f51444b..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.SerializationStream;
-import scala.reflect.ClassTag;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializationStream extends SerializationStream {
-
-    private final OutputStream outputStream;
-    private final GryoSerializerInstance serializer;
-
-    public GryoSerializationStream(final GryoSerializerInstance serializer, final OutputStream outputStream) {
-        this.outputStream = outputStream;
-        this.serializer = serializer;
-    }
-
-    @Override
-    public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
-        try {
-            this.outputStream.write(this.serializer.serialize(t, classTag).array());
-            this.outputStream.flush();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-        return this;
-    }
-
-    @Override
-    public void flush() {
-        try {
-            this.outputStream.flush();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            this.outputStream.close();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
deleted file mode 100644
index c2e99a9..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.SerializableWritable;
-import org.apache.spark.api.python.PythonBroadcast;
-import org.apache.spark.broadcast.HttpBroadcast;
-import org.apache.spark.scheduler.CompressedMapStatus;
-import org.apache.spark.serializer.Serializer;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.spark.util.SerializableConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
-import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializer extends Serializer {
-    @Override
-    public SerializerInstance newInstance() {
-        return new GryoSerializerInstance(
-                GryoMapper.build().
-                        addCustom(SerializableWritable.class, new JavaSerializer()).
-                        addCustom(Tuple2.class, new JavaSerializer()).
-                        addCustom(CompressedMapStatus.class, new JavaSerializer()).
-                        addCustom(HttpBroadcast.class, new JavaSerializer()).
-                        addCustom(PythonBroadcast.class, new JavaSerializer()).
-                        addCustom(MessagePayload.class, new JavaSerializer()).
-                        addCustom(ViewIncomingPayload.class, new JavaSerializer()).
-                        addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
-                        addCustom(ViewPayload.class, new JavaSerializer()).
-                        addCustom(SerializableConfiguration.class, new JavaSerializer()).
-                        addCustom(VertexWritable.class, new JavaSerializer()).
-                        addCustom(ObjectWritable.class, new JavaSerializer()).
-                        create().createMapper());
-        // kryo.register(org.apache.spark.serializer.JavaIterableWrapperSerializer..MODULE$.wrapperClass(), new JavaIterableWrapperSerializer());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
deleted file mode 100644
index 9c74434..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.DeserializationStream;
-import org.apache.spark.serializer.SerializationStream;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
-import scala.reflect.ClassTag;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializerInstance extends SerializerInstance {
-
-    private final Kryo kryo;
-
-    public GryoSerializerInstance(final Kryo kryo) {
-        this.kryo = kryo;
-    }
-
-    @Override
-    public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
-        final Output output = new Output(100000);
-        this.kryo.writeClassAndObject(output, t);
-        output.flush();
-        return ByteBuffer.wrap(output.getBuffer());
-    }
-
-    @Override
-    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
-        return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
-    }
-
-    @Override
-    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
-        this.kryo.setClassLoader(classLoader);
-        return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
-    }
-
-    @Override
-    public SerializationStream serializeStream(final OutputStream outputStream) {
-        return new GryoSerializationStream(this, outputStream);
-    }
-
-    @Override
-    public DeserializationStream deserializeStream(final InputStream inputStream) {
-        return new GryoDeserializationStream(this, inputStream);
-    }
-
-    public Kryo getKryo() {
-        return this.kryo;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
new file mode 100644
index 0000000..6c1a164
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.reflect.ClassTag;
+
+import java.io.OutputStream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializationStream extends SerializationStream {
+
+    private final Output output;
+    private final GryoSerializerInstance gryoSerializer;
+
+    public GryoSerializationStream(final GryoSerializerInstance gryoSerializer, final OutputStream outputStream) {
+        this.output = new Output(outputStream);
+        this.gryoSerializer = gryoSerializer;
+    }
+
+    @Override
+    public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
+        final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
+        writer.getKryo().writeClassAndObject(this.output, t);
+        this.gryoSerializer.getGryoPool().offerWriter(writer);
+        return this;
+    }
+
+    @Override
+    public void flush() {
+        this.output.flush();
+    }
+
+    @Override
+    public void close() {
+        this.output.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
new file mode 100644
index 0000000..ee16126
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.python.PythonBroadcast;
+import org.apache.spark.broadcast.HttpBroadcast;
+import org.apache.spark.network.util.ByteUnit;
+import org.apache.spark.scheduler.CompressedMapStatus;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.util.SerializableConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializer extends Serializer {
+    private final boolean referenceTracking;
+    private final boolean registrationRequired;
+    //private final Option<String> userRegistrator;
+    private final long bufferSizeKb;
+    private final int bufferSize;
+    private final int maxBufferSizeMb;
+    private final int maxBufferSize;
+
+    private final GryoPool gryoPool;
+
+    public GryoSerializer(final SparkConf sparkConfiguration) {
+        this.bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
+        if (this.bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
+            throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + this.bufferSizeKb + " mb.");
+        } else {
+            this.bufferSize = (int) ByteUnit.KiB.toBytes(this.bufferSizeKb);
+            this.maxBufferSizeMb = (int) sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
+            if (this.maxBufferSizeMb >= ByteUnit.GiB.toMiB(2L)) {
+                throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than 2048 mb, got: " + this.maxBufferSizeMb + " mb.");
+            } else {
+                this.maxBufferSize = (int) ByteUnit.MiB.toBytes(this.maxBufferSizeMb);
+                this.referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+                this.registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+                //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
+
+            }
+        }
+        this.gryoPool = new GryoPool(makeApacheConfiguration(sparkConfiguration), builder -> {
+            try {
+                builder.
+                        addCustom(SerializableWritable.class, new JavaSerializer()).
+                        addCustom(Tuple2.class, new JavaSerializer()).
+                        addCustom(CompressedMapStatus.class, new JavaSerializer()).
+                        addCustom(HttpBroadcast.class, new JavaSerializer()).
+                        addCustom(PythonBroadcast.class, new JavaSerializer()).
+                        addCustom(BoxedUnit.class, new JavaSerializer()).
+                        addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
+                        addCustom(MessagePayload.class, new JavaSerializer()).
+                        addCustom(ViewIncomingPayload.class, new JavaSerializer()).
+                        addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
+                        addCustom(ViewPayload.class, new JavaSerializer()).
+                        addCustom(SerializableConfiguration.class, new JavaSerializer()).
+                        addCustom(VertexWritable.class, new JavaSerializer()).
+                        addCustom(ObjectWritable.class, new JavaSerializer());
+            } catch (final ClassNotFoundException e) {
+                throw new IllegalStateException(e);
+            }
+        }, kryo -> {
+            kryo.setRegistrationRequired(this.registrationRequired);
+            kryo.setReferences(this.referenceTracking);
+        });
+
+    }
+
+    public Output newOutput() {
+        return new Output(this.bufferSize, this.maxBufferSize);
+    }
+
+    public GryoPool getGryoPool() {
+        return this.gryoPool;
+    }
+
+    @Override
+    public SerializerInstance newInstance() {
+        return new GryoSerializerInstance(this);
+    }
+
+    private static Configuration makeApacheConfiguration(final SparkConf sparkConfiguration) {
+        final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+        apacheConfiguration.setDelimiterParsingDisabled(true);
+        for (final Tuple2<String, String> tuple : sparkConfiguration.getAll()) {
+            apacheConfiguration.setProperty(tuple._1(), tuple._2());
+        }
+        return apacheConfiguration;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
new file mode 100644
index 0000000..764d8f0
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.reflect.ClassTag;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializerInstance extends SerializerInstance {
+
+    private final GryoSerializer gryoSerializer;
+    private final Output output;
+    private final Input input;
+
+    public GryoSerializerInstance(final GryoSerializer gryoSerializer) {
+        this.gryoSerializer = gryoSerializer;
+        this.input = new Input();
+        this.output = gryoSerializer.newOutput();
+    }
+
+    @Override
+    public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
+        final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
+        writer.getKryo().writeClassAndObject(this.output, t);
+        this.output.flush();
+        this.gryoSerializer.getGryoPool().offerWriter(writer);
+        return ByteBuffer.wrap(this.output.getBuffer());
+    }
+
+    @Override
+    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
+        this.input.setBuffer(byteBuffer.array());
+        final GryoReader reader = this.gryoSerializer.getGryoPool().takeReader();
+        final T t = (T) reader.getKryo().readClassAndObject(this.input);
+        this.gryoSerializer.getGryoPool().offerReader(reader);
+        return t;
+    }
+
+    @Override
+    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
+        return this.deserialize(byteBuffer, classTag);
+    }
+
+    @Override
+    public SerializationStream serializeStream(final OutputStream outputStream) {
+        return new GryoSerializationStream(this, outputStream);
+    }
+
+    @Override
+    public DeserializationStream deserializeStream(final InputStream inputStream) {
+        return new GryoDeserializationStream(this, inputStream);
+    }
+
+    public GryoPool getGryoPool() {
+        return this.gryoSerializer.getGryoPool();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
index 7ed741b..916a3d7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -115,7 +115,7 @@ public final class HadoopSparkGraphProvider extends AbstractGraphProvider {
             /// spark configuration
             put("spark.master", "local[4]");
             // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-            put("spark.serializer","org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo.GryoSerializer");
+            put("spark.serializer","org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
             // put("spark.kryo.registrationRequired",true);
         }};
     }