You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/27 11:27:55 UTC

[2/2] tinkerpop git commit: KryoShimServices now implement .close() so they can relinquish resources. KryoShimServiceLoader also implements close() and will close any static shim service it maintains. On a force reload, any existing KryoShimService is cl

KryoShimServices now implement .close() so they can relinquish resources. KryoShimServiceLoader also implements close() and will close any static shim service it maintains. On a force reload, any existing KryoShimService is close()d. This was necessary for the test suite as different IORegitries were being loaded, dangling KRYO objects didn't have the loaded registries. Reverted GryoMapper to how it was and now the Spark-specific classes in GryoSerializer are simply an IoRegistry -- ta da. Going to do GraphSON IoRegistry testing in Giraph and Spark and then I think we are done with tihs branch. cc/ @dalaro.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f2488eff
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f2488eff
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f2488eff

Branch: refs/heads/TINKERPOP-1389
Commit: f2488effdb4307c4c17fc48ed60e5746e85104f3
Parents: 14e7668
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Oct 27 05:27:48 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Oct 27 05:27:48 2016 -0600

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoMapper.java   | 43 +++++-----
 .../io/gryo/kryoshim/KryoShimService.java       |  8 +-
 .../io/gryo/kryoshim/KryoShimServiceLoader.java | 13 ++-
 .../structure/io/HadoopPoolShimService.java     |  7 +-
 .../hadoop/structure/io/HadoopPools.java        |  4 +-
 .../structure/io/AbstractIoRegistryCheck.java   |  2 +
 .../spark/structure/io/gryo/GryoSerializer.java | 85 ++++++++++++--------
 .../io/gryo/IoRegistryAwareKryoSerializer.java  |  2 +-
 .../unshaded/UnshadedKryoShimService.java       |  6 ++
 .../SparkHadoopGraphGryoSerializerProvider.java |  2 +
 .../computer/SparkHadoopGraphProvider.java      |  7 +-
 .../structure/io/SparkIoRegistryCheck.java      |  5 ++
 12 files changed, 120 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index f7a9b25..cc29b51 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -393,6 +393,8 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(GryoTypeReg.of(ProfileStep.ProfileBiOperator.class, 119));
         }};
 
+        private final List<IoRegistry> registries = new ArrayList<>();
+
         /**
          * Starts numbering classes for Gryo serialization at 65536 to leave room for future usage by TinkerPop.
          */
@@ -425,23 +427,7 @@ public final class GryoMapper implements Mapper<Kryo> {
         @Override
         public Builder addRegistry(final IoRegistry registry) {
             if (null == registry) throw new IllegalArgumentException("The registry cannot be null");
-            final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class);
-            serializers.forEach(p -> {
-                if (null == p.getValue1())
-                    addCustom(p.getValue0());
-                else if (p.getValue1() instanceof SerializerShim)
-                    addCustom(p.getValue0(), new ShadedSerializerAdapter((SerializerShim) p.getValue1()));
-                else if (p.getValue1() instanceof Serializer)
-                    addCustom(p.getValue0(), (Serializer) p.getValue1());
-                else if (p.getValue1() instanceof Function)
-                    addCustom(p.getValue0(), (Function<Kryo, Serializer>) p.getValue1());
-                else
-                    throw new IllegalStateException(String.format(
-                            "Unexpected value provided by %s for serializable class %s - expected a parameter in [null, %s (or shim) implementation or Function<%s, %s>], but received %s",
-                            registry.getClass().getSimpleName(), p.getValue0().getClass().getCanonicalName(),
-                            Serializer.class.getName(), Kryo.class.getSimpleName(),
-                            Serializer.class.getSimpleName(), p.getValue1()));
-            });
+            this.registries.add(registry);
             return this;
         }
 
@@ -529,6 +515,27 @@ public final class GryoMapper implements Mapper<Kryo> {
          * Creates a {@code GryoMapper}.
          */
         public GryoMapper create() {
+            // consult the registry if provided and inject registry entries as custom classes.
+            registries.forEach(registry -> {
+                final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class);
+                serializers.forEach(p -> {
+                    if (null == p.getValue1())
+                        addCustom(p.getValue0());
+                    else if (p.getValue1() instanceof SerializerShim)
+                        addCustom(p.getValue0(), new ShadedSerializerAdapter((SerializerShim) p.getValue1()));
+                    else if (p.getValue1() instanceof Serializer)
+                        addCustom(p.getValue0(), (Serializer) p.getValue1());
+                    else if (p.getValue1() instanceof Function)
+                        addCustom(p.getValue0(), (Function<Kryo, Serializer>) p.getValue1());
+                    else
+                        throw new IllegalStateException(String.format(
+                                "Unexpected value provided by %s for serializable class %s - expected a parameter in [null, %s implementation or Function<%s, %s>], but received %s",
+                                registry.getClass().getSimpleName(), p.getValue0().getClass().getCanonicalName(),
+                                Serializer.class.getName(), Kryo.class.getSimpleName(),
+                                Serializer.class.getSimpleName(), p.getValue1()));
+                });
+            });
+
             return new GryoMapper(this);
         }
 
@@ -661,4 +668,4 @@ public final class GryoMapper implements Mapper<Kryo> {
                     .toString();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
index 4d3ece5..4422e1b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
@@ -57,7 +57,7 @@ public interface KryoShimService {
     /**
      * Serializes an object to an output stream.  This may flush the output stream.
      *
-     * @param o the object to serialize
+     * @param o    the object to serialize
      * @param sink the stream into which the serialized object is written
      */
     public void writeClassAndObject(final Object o, final OutputStream sink);
@@ -92,4 +92,10 @@ public interface KryoShimService {
      * @param conf the configuration to apply to this service's internal serializer
      */
     public void applyConfiguration(final Configuration conf);
+
+    /**
+     * Release all resources associated with the shim service.
+     * This is called on a forced reload or when the {@link KryoShimServiceLoader} is closed.
+     */
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index f41b007..70be7ad 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -51,12 +51,19 @@ public class KryoShimServiceLoader {
     public static void applyConfiguration(final Configuration configuration) {
         if (null == KryoShimServiceLoader.configuration ||
                 null == KryoShimServiceLoader.cachedShimService ||
-                !ConfigurationUtils.toString(KryoShimServiceLoader.configuration).equals(ConfigurationUtils.toString(configuration))) {
+                !KryoShimServiceLoader.configuration.getKeys().hasNext()) {
             KryoShimServiceLoader.configuration = configuration;
             load(true);
         }
     }
 
+    public static void close() {
+        if (null != cachedShimService)
+            cachedShimService.close();
+        cachedShimService = null;
+        configuration = null;
+    }
+
     /**
      * Return a reference to the shim service.  This method may return a cached shim service
      * unless {@code forceReload} is true.  Calls to this method need not be externally
@@ -72,6 +79,10 @@ public class KryoShimServiceLoader {
         if (null != cachedShimService && !forceReload)
             return cachedShimService;
 
+        // if a service is already loaded, close it
+        if (null != cachedShimService)
+            cachedShimService.close();
+
         // if the configuration is null, try and load the configuration from System.properties
         if (null == configuration)
             configuration = SystemUtil.getSystemPropertiesConfiguration("tinkerpop", true);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index a52eac4..db79d97 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -20,11 +20,9 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
@@ -50,6 +48,11 @@ public class HadoopPoolShimService implements KryoShimService {
     }
 
     @Override
+    public void close() {
+        HadoopPools.close();
+    }
+
+    @Override
     public int getPriority() {
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index 8f7b97c..e652509 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -44,8 +44,8 @@ public final class HadoopPools {
             GRYO_POOL = GryoPool.build().
                     poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
                     ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
-                    //initializeMapper(m -> m.registrationRequired(false)).
-                            create();
+                    initializeMapper(m -> m.registrationRequired(false)).
+                    create();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
index 9002d57..db947c2 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
@@ -65,6 +65,8 @@ public abstract class AbstractIoRegistryCheck extends AbstractGremlinTest {
         graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
         graph.configuration().setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath());
         graph.configuration().setProperty(GryoPool.CONFIG_IO_REGISTRY, ToyIoRegistry.class.getCanonicalName());
+        final GryoOutputFormat inputFormat = new GryoOutputFormat();
+        inputFormat.getRecordWriter()
         final GryoRecordWriter writer = new GryoRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(graph.configuration()));
         validateIoRegistryGraph(graph, graphComputerClass, writer, GryoInputFormat.class);
         assertTrue(input.delete());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 1b9fa3b..3bdf81f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -41,6 +41,8 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayloa
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 import org.apache.tinkerpop.shaded.kryo.serializers.ExternalizableSerializer;
@@ -51,7 +53,9 @@ import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -67,8 +71,8 @@ public final class GryoSerializer extends Serializer implements Serializable {
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
         final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
-        referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
-        registrationRequired = sparkConfiguration.getBoolean(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, false);
+        this.referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+        this.registrationRequired = sparkConfiguration.getBoolean(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, false);
         if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
             throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
         } else {
@@ -81,40 +85,16 @@ public final class GryoSerializer extends Serializer implements Serializable {
             }
         }
         // create a GryoPool and store it in static HadoopPools
+        final List<Object> ioRegistries = new ArrayList<>();
+        ioRegistries.addAll(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList()));
+        ioRegistries.add(SparkIoRegistry.class.getCanonicalName().replace("." + SparkIoRegistry.class.getSimpleName(), "$" + SparkIoRegistry.class.getSimpleName()));
         HadoopPools.initialize(GryoPool.build().
                 poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
-                ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
-                initializeMapper(builder -> {
-                    try {
-                        builder.addCustom(Tuple2.class, new Tuple2Serializer())
-                                .addCustom(Tuple2[].class)
-                                .addCustom(Tuple3.class, new Tuple3Serializer())
-                                .addCustom(Tuple3[].class)
-                                .addCustom(CompactBuffer.class, new CompactBufferSerializer())
-                                .addCustom(CompactBuffer[].class)
-                                .addCustom(CompressedMapStatus.class)
-                                .addCustom(BlockManagerId.class)
-                                .addCustom(HighlyCompressedMapStatus.class, new ExternalizableSerializer())   // externalizable implemented so its okay
-                                .addCustom(TorrentBroadcast.class)
-                                .addCustom(PythonBroadcast.class)
-                                .addCustom(BoxedUnit.class)
-                                .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer())
-                                .addCustom(Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer())
-                                .addCustom(WrappedArray.ofRef.class, new WrappedArraySerializer())
-                                .addCustom(MessagePayload.class)
-                                .addCustom(ViewIncomingPayload.class)
-                                .addCustom(ViewOutgoingPayload.class)
-                                .addCustom(ViewPayload.class)
-                                .addCustom(SerializableConfiguration.class, new JavaSerializer())
-                                .addCustom(VertexWritable.class, new VertexWritableSerializer())
-                                .addCustom(ObjectWritable.class, new ObjectWritableSerializer())
-                                .referenceTracking(this.referenceTracking)
-                                .registrationRequired(this.registrationRequired);
-                        // add these as we find ClassNotFoundExceptions
-                    } catch (final ClassNotFoundException e) {
-                        throw new IllegalStateException(e);
-                    }
-                }).create());
+                ioRegistries(ioRegistries).
+                initializeMapper(builder ->
+                        builder.referenceTracking(this.referenceTracking).
+                                registrationRequired(this.registrationRequired)).
+                create());
     }
 
     public Output newOutput() {
@@ -138,4 +118,41 @@ public final class GryoSerializer extends Serializer implements Serializable {
         }
         return apacheConfiguration;
     }
+
+    public static class SparkIoRegistry extends AbstractIoRegistry {
+        private static final SparkIoRegistry INSTANCE = new SparkIoRegistry();
+
+        private SparkIoRegistry() {
+            try {
+                super.register(GryoIo.class, Tuple2.class, new Tuple2Serializer());
+                super.register(GryoIo.class, Tuple2[].class, null);
+                super.register(GryoIo.class, Tuple3.class, new Tuple3Serializer());
+                super.register(GryoIo.class, Tuple3[].class, null);
+                super.register(GryoIo.class, CompactBuffer.class, new CompactBufferSerializer());
+                super.register(GryoIo.class, CompactBuffer[].class, null);
+                super.register(GryoIo.class, CompressedMapStatus.class, null);
+                super.register(GryoIo.class, BlockManagerId.class, null);
+                super.register(GryoIo.class, HighlyCompressedMapStatus.class, new ExternalizableSerializer());  // externalizable implemented so its okay
+                super.register(GryoIo.class, TorrentBroadcast.class, null);
+                super.register(GryoIo.class, PythonBroadcast.class, null);
+                super.register(GryoIo.class, BoxedUnit.class, null);
+                super.register(GryoIo.class, Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer());
+                super.register(GryoIo.class, Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer());
+                super.register(GryoIo.class, WrappedArray.ofRef.class, new WrappedArraySerializer());
+                super.register(GryoIo.class, MessagePayload.class, null);
+                super.register(GryoIo.class, ViewIncomingPayload.class, null);
+                super.register(GryoIo.class, ViewOutgoingPayload.class, null);
+                super.register(GryoIo.class, ViewPayload.class, null);
+                super.register(GryoIo.class, SerializableConfiguration.class, new JavaSerializer());
+                super.register(GryoIo.class, VertexWritable.class, new VertexWritableSerializer());
+                super.register(GryoIo.class, ObjectWritable.class, new ObjectWritableSerializer());
+            } catch (final ClassNotFoundException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        public static SparkIoRegistry getInstance() {
+            return INSTANCE;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
index ba6d001..1385a5b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -70,7 +70,7 @@ public final class IoRegistryAwareKryoSerializer extends KryoSerializer {
             else if (null != type.getShadedSerializer() && type.getShadedSerializer() instanceof ShadedSerializerAdapter)
                 kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) type.getShadedSerializer()).getSerializerShim()), type.getId());
             else
-                kryo.register(type.getTargetClass(), type.getId());
+                kryo.register(type.getTargetClass(), kryo.getDefaultSerializer(type.getTargetClass()), type.getId());
         }
         return kryo;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index 4932acb..caf5268 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -90,6 +90,12 @@ public class UnshadedKryoShimService implements KryoShimService {
         initialize(configuration);
     }
 
+    @Override
+    public void close() {
+        INITIALIZED = false;
+        KRYOS.clear();
+    }
+
     private LinkedBlockingQueue<Kryo> initialize(final Configuration configuration) {
         // DCL is safe in this case due to volatility
         if (!INITIALIZED) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
index 0e7fe0d..6ebe626 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Map;
 
@@ -34,6 +35,7 @@ public final class SparkHadoopGraphGryoSerializerProvider extends SparkHadoopGra
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
         if (!SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) {
             Spark.close();
+            KryoShimServiceLoader.close();
             System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName());
         }
         final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 8bc6519..dcec3f8 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -27,7 +27,6 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopGremlinPluginCheck;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.ToyIoRegistry;
 import org.apache.tinkerpop.gremlin.process.computer.Computer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -41,11 +40,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
-import org.apache.tinkerpop.gremlin.spark.structure.io.SparkIoRegistryCheck;
 import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
 import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Map;
 
@@ -61,6 +59,7 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider {
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
         if (this.getClass().equals(SparkHadoopGraphProvider.class) && !SparkHadoopGraphProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) {
             Spark.close();
+            KryoShimServiceLoader.close();
             System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphProvider.class.getCanonicalName());
         }
 
@@ -98,8 +97,6 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider {
         config.put(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
         config.put(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
         config.put(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, true);
-        // TODO: why do I need this here?!
-        config.put(GryoPool.CONFIG_IO_REGISTRY, ToyIoRegistry.class.getCanonicalName());
         return config;
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2488eff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
index f748d86..54ed4ed 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
@@ -24,6 +24,9 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.AbstractIoRegistryCheck;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
+import org.apache.tinkerpop.gremlin.util.SystemUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,6 +42,7 @@ public class SparkIoRegistryCheck extends AbstractIoRegistryCheck {
         SparkContextStorage.open("local[4]");
         Spark.close();
         HadoopPools.close();
+        KryoShimServiceLoader.close();
     }
 
     @After
@@ -47,6 +51,7 @@ public class SparkIoRegistryCheck extends AbstractIoRegistryCheck {
         Spark.create("local[4]");
         Spark.close();
         HadoopPools.close();
+        KryoShimServiceLoader.close();
     }
 
     @Test