You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/26 20:21:49 UTC

tinkerpop git commit: lots of good stuff here -- finally have testing of IORegistry in SparkGraphComputer. Have ToyPoint and TestIoRegistry. Realized a bunch of stupid .flush() calls in the GryoSerializer serializers (Spark)... perhaps that is why things

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1389 a97ba5745 -> 77fb25fff


lots of good stuff here -- finally have testing of IORegistry in SparkGraphComputer. Have ToyPoint and TestIoRegistry. Realized a bunch of stupid .flush() calls in the GryoSerializer serializers (Spark)... perhaps that is why things are slower than KryoSerializer. Really cleaned up IoRegistryAwareKryoSerializer. Added getShim() methods to the Shaded/UnshadedSerializerAdaptors. I'm a stud. cc/ @dalaro


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/77fb25ff
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/77fb25ff
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/77fb25ff

Branch: refs/heads/TINKERPOP-1389
Commit: 77fb25fff97ce3fc3c65fb253e1b851b7956aaaf
Parents: a97ba57
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Oct 26 14:21:42 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Oct 26 14:21:42 2016 -0600

----------------------------------------------------------------------
 .../io/gryo/kryoshim/KryoShimServiceLoader.java |   2 +-
 .../shaded/ShadedSerializerAdapter.java         |   6 +-
 .../gremlin/hadoop/HadoopGraphProvider.java     |   1 -
 .../io/gryo/CompactBufferSerializer.groovy      |   2 -
 .../io/gryo/IoRegistryAwareKryoSerializer.java  |  39 +++----
 .../io/gryo/ObjectWritableSerializer.java       |   1 -
 .../structure/io/gryo/Tuple2Serializer.java     |   2 -
 .../structure/io/gryo/Tuple3Serializer.java     |   3 -
 .../io/gryo/VertexWritableSerializer.java       |   1 -
 .../io/gryo/WrappedArraySerializer.java         |   1 -
 .../unshaded/UnshadedSerializerAdapter.java     |  18 ++--
 .../gremlin/spark/AbstractSparkTest.java        |   7 +-
 .../structure/io/gryo/GryoIoRegistryTest.java   | 101 +++++++++++++++++++
 .../spark/structure/io/gryo/TestIoRegistry.java |  40 ++++++++
 .../spark/structure/io/gryo/ToyPoint.java       |  75 ++++++++++++++
 15 files changed, 255 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 5f50f9e..9287b10 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -114,7 +114,7 @@ public class KryoShimServiceLoader {
             throw new IllegalStateException("Unable to load KryoShimService");
 
         // once the shim service is defined, configure it
-        log.info("Configuring KryoShimService {} with following configuration:\n####################\n{}\n####################",
+        log.info("Configuring KryoShimService {} with following configuration:\n#######START########\n{}\n########END#########",
                 cachedShimService.getClass().getCanonicalName(),
                 ConfigurationUtils.toString(configuration));
         cachedShimService.applyConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
index 28a44bd..fca19c7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
@@ -26,7 +26,7 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
 
 public class ShadedSerializerAdapter<T> extends Serializer<T> {
 
-    SerializerShim<T> serializer;
+    private final SerializerShim<T> serializer;
 
     public ShadedSerializerAdapter(final SerializerShim<T> serializer) {
         this.serializer = serializer;
@@ -51,4 +51,8 @@ public class ShadedSerializerAdapter<T> extends Serializer<T> {
         final ShadedInputAdapter shadedInputAdapter = new ShadedInputAdapter(input);
         return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass);
     }
+
+    public SerializerShim<T> getSerializerShim() {
+        return this.serializer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 9c6a352..c95ede5 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -111,7 +111,6 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
 
     @Override
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
-        System.clearProperty(KRYO_SHIM_SERVICE);
         this.graphSONInput = RANDOM.nextBoolean();
         return new HashMap<String, Object>() {{
             put(Graph.GRAPH, HadoopGraph.class.getName());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
index be491c4..693ced6 100644
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
@@ -44,10 +44,8 @@ public final class CompactBufferSerializer<T> extends Serializer<CompactBuffer<T
         kryo.writeClassAndObject(output, compactBuffer.evidence$1);
         kryo.writeClassAndObject(output, compactBuffer.element0);
         kryo.writeClassAndObject(output, compactBuffer.element1);
-        output.flush();
         output.writeVarInt(compactBuffer.org$apache$spark$util$collection$CompactBuffer$$curSize, true);
         kryo.writeClassAndObject(output, compactBuffer.otherElements);
-        output.flush();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
index 6d9b536..ba6d001 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -31,41 +31,46 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.Un
 import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 /**
  * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}.
  */
-public class IoRegistryAwareKryoSerializer extends KryoSerializer {
-
-    private final SparkConf configuration;
+public final class IoRegistryAwareKryoSerializer extends KryoSerializer {
 
     private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class);
 
+    private final List<TypeRegistration<?>> typeRegistrations = new ArrayList<>();
+
     public IoRegistryAwareKryoSerializer(final SparkConf configuration) {
         super(configuration);
-        // store conf so that we can access its registry (if one is present) in newKryo()
-        this.configuration = configuration;
+        if (!configuration.contains(GryoPool.CONFIG_IO_REGISTRY))
+            log.info("SparkConf does not contain a {} property. Skipping {} processing.", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName());
+        else {
+            final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create();
+            for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) {
+                log.info("Registering {} with serializer type: {}", type.getTargetClass().getCanonicalName(), type);
+                this.typeRegistrations.add(type);
+            }
+        }
     }
 
     @Override
     public Kryo newKryo() {
         final Kryo kryo = super.newKryo();
-        return applyIoRegistryIfPresent(kryo);
-    }
-
-    private Kryo applyIoRegistryIfPresent(final Kryo kryo) {
-        if (!this.configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) {
-            log.info("SparkConf does not contain setting {}, skipping {} handling", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName());
-            return kryo;
-        }
-        final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(this.configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create();
-        for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) {
-            log.info("Registering {} with serializer {} and id {}", type.getTargetClass().getCanonicalName(), type.getSerializerShim(), type.getId());
-            kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter<>(type.getSerializerShim()), type.getId());
+        for (final TypeRegistration<?> type : this.typeRegistrations) {
+            if (null != type.getSerializerShim())
+                kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(type.getSerializerShim()), type.getId());
+            else if (null != type.getShadedSerializer() && type.getShadedSerializer() instanceof ShadedSerializerAdapter)
+                kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) type.getShadedSerializer()).getSerializerShim()), type.getId());
+            else
+                kryo.register(type.getTargetClass(), type.getId());
         }
         return kryo;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
index 01be50d..2ec9615 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
@@ -37,7 +37,6 @@ public final class ObjectWritableSerializer<T> implements SerializerShim<ObjectW
     @Override
     public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final ObjectWritable<T> starGraph) {
         kryo.writeClassAndObject(output, starGraph.get());
-        output.flush();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
index 05d0b9e..c610286 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
@@ -34,9 +34,7 @@ public final class Tuple2Serializer<A, B> extends Serializer<Tuple2<A, B>> {
     @Override
     public void write(final Kryo kryo, final Output output, final Tuple2<A, B> tuple2) {
         kryo.writeClassAndObject(output, tuple2._1());
-        output.flush();
         kryo.writeClassAndObject(output, tuple2._2());
-        output.flush();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
index f188794..46a7d02 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
@@ -33,11 +33,8 @@ public final class Tuple3Serializer<A, B, C> extends Serializer<Tuple3<A, B, C>>
     @Override
     public void write(final Kryo kryo, final Output output, final Tuple3<A, B, C> tuple3) {
         kryo.writeClassAndObject(output, tuple3._1());
-        output.flush();
         kryo.writeClassAndObject(output, tuple3._2());
-        output.flush();
         kryo.writeClassAndObject(output, tuple3._3());
-        output.flush();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
index c89fb05..93c86d8 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
@@ -38,7 +38,6 @@ public final class VertexWritableSerializer implements SerializerShim<VertexWrit
     @Override
     public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final VertexWritable vertexWritable) {
         kryo.writeObject(output, vertexWritable.get().graph());
-        output.flush();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
index 8de1955..803a19c 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
@@ -36,7 +36,6 @@ public final class WrappedArraySerializer<T> extends Serializer<WrappedArray<T>>
         output.writeVarInt(iterable.size(), true);
         JavaConversions.asJavaCollection(iterable).forEach(t -> {
             kryo.writeClassAndObject(output, t);
-            output.flush();
         });
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
index a5f8b05..452c47a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
@@ -16,12 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-/**
- * Copyright DataStax, Inc.
- *
- * Please see the included license file for details.
- */
 package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -30,10 +24,9 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 
-public class UnshadedSerializerAdapter<T> extends Serializer<T>
-{
+public class UnshadedSerializerAdapter<T> extends Serializer<T> {
 
-    SerializerShim<T> serializer;
+    private final SerializerShim<T> serializer;
 
     public UnshadedSerializerAdapter(final SerializerShim<T> serializer) {
         this.serializer = serializer;
@@ -48,10 +41,13 @@ public class UnshadedSerializerAdapter<T> extends Serializer<T>
     }
 
     @Override
-    public T read(final Kryo kryo, final Input input, final Class<T> aClass)
-    {
+    public T read(final Kryo kryo, final Input input, final Class<T> aClass) {
         UnshadedKryoAdapter shadedKryoAdapter = new UnshadedKryoAdapter(kryo);
         UnshadedInputAdapter shadedInputAdapter = new UnshadedInputAdapter(input);
         return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass);
     }
+
+    public SerializerShim<T> getSerializerShim() {
+        return this.serializer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
index 6d2231f..ab2cf2f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.launcher.SparkLauncher;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
@@ -45,7 +46,7 @@ public abstract class AbstractSparkTest {
     public void setupTest() {
         SparkConf sparkConfiguration = new SparkConf();
         sparkConfiguration.setAppName(this.getClass().getCanonicalName() + "-setupTest");
-        sparkConfiguration.set("spark.master", "local[4]");
+        sparkConfiguration.set(SparkLauncher.SPARK_MASTER, "local[4]");
         JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
         sparkContext.close();
         Spark.create(sparkContext.sc());
@@ -56,9 +57,9 @@ public abstract class AbstractSparkTest {
     protected Configuration getBaseConfiguration() {
         final BaseConfiguration configuration = new BaseConfiguration();
         configuration.setDelimiterParsingDisabled(true);
-        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty(SparkLauncher.SPARK_MASTER, "local[4]");
         configuration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
-        configuration.setProperty("spark.kryo.registrationRequired", true);
+        configuration.setProperty(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, true);
         configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         return configuration;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java
new file mode 100644
index 0000000..0260d02
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java
@@ -0,0 +1,101 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordWriter;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoIoRegistryTest extends AbstractSparkTest {
+
+    @Test
+    public void shouldSupportIoRegistry() throws Exception {
+        final File input = TestHelper.generateTempFile(this.getClass(), "input", ".kryo");
+        final Configuration configuration = super.getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(GryoPool.CONFIG_IO_REGISTRY, TestIoRegistry.class.getCanonicalName());
+        //configuration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
+
+        HadoopGraph graph = HadoopGraph.open(configuration);
+
+        final GryoRecordWriter writer = new GryoRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(configuration));
+        for (int i = 0; i < 10; i++) {
+            final StarGraph starGraph = StarGraph.open();
+            starGraph.addVertex(T.label, "place", T.id, i, "point", new ToyPoint(i, i * 10), "message", "I'm " + i);
+            writer.write(NullWritable.get(), new VertexWritable(starGraph.getStarVertex()));
+        }
+        writer.close(new TaskAttemptContextImpl(ConfUtil.makeHadoopConfiguration(configuration), new TaskAttemptID()));
+        // OLAP TESTING //
+        final List<ToyPoint> points = graph.traversal().withComputer(SparkGraphComputer.class).V().<ToyPoint>values("point").toList();
+        assertEquals(10, points.size());
+        for (int i = 0; i < 10; i++) {
+            assertTrue(points.contains(new ToyPoint(i, i * 10)));
+        }
+        points.clear();
+        // OLTP TESTING //
+        graph.traversal().V().<ToyPoint>values("point").fill(points);
+        assertEquals(10, points.size());
+        for (int i = 0; i < 10; i++) {
+            assertTrue(points.contains(new ToyPoint(i, i * 10)));
+        }
+        points.clear();
+        // HDFS TESTING //
+        final List<Vertex> list = IteratorUtils.asList(FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(configuration)).head(input.getAbsolutePath(), GryoInputFormat.class));
+        list.forEach(v -> points.add(v.value("point")));
+        assertEquals(10, points.size());
+        for (int i = 0; i < 10; i++) {
+            assertTrue(points.contains(new ToyPoint(i, i * 10)));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java
new file mode 100644
index 0000000..9a78aab
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TestIoRegistry extends AbstractIoRegistry {
+
+    private static final TestIoRegistry INSTANCE = new TestIoRegistry();
+
+    private TestIoRegistry() {
+        super.register(GryoIo.class, ToyPoint.class, new ShadedSerializerAdapter<>(new ToyPoint.ToyPointSerializer()));
+    }
+
+    public static TestIoRegistry getInstance() {
+        return INSTANCE;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java
new file mode 100644
index 0000000..e46e9c3
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java
@@ -0,0 +1,75 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ToyPoint implements Serializable {
+
+    private final int x;
+    private final int y;
+
+    public ToyPoint(final int x, final int y) {
+        this.x = x;
+        this.y = y;
+    }
+
+    public int getX() {
+        return this.x;
+    }
+
+    public int getY() {
+        return this.y;
+    }
+
+    public int hashCode() {
+        return this.x + this.y;
+    }
+
+    public boolean equals(final Object other) {
+        return other instanceof ToyPoint && ((ToyPoint) other).x == this.x && ((ToyPoint) other).y == this.y;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + this.x + "," + this.y + "]";
+    }
+
+    public static class ToyPointSerializer implements SerializerShim<ToyPoint> {
+        @Override
+        public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final ToyPoint toyPoint) {
+            output.writeInt(toyPoint.x);
+            output.writeInt(toyPoint.y);
+        }
+
+        @Override
+        public <I extends InputShim> ToyPoint read(final KryoShim<I, ?> kryo, final I input, final Class<ToyPoint> toyPointClass) {
+            return new ToyPoint(input.readInt(), input.readInt());
+        }
+    }
+}