You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/06 20:53:41 UTC

[02/10] incubator-tinkerpop git commit: Introduce Kryo shim to support serializer reuse

Introduce Kryo shim to support serializer reuse


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ef528697
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ef528697
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ef528697

Branch: refs/heads/TINKERPOP-1321
Commit: ef52869788ebf5b8b825f78ef21e1d38423d9aa0
Parents: aa673db
Author: Dan LaRocque <da...@hopcount.org>
Authored: Thu Jun 2 02:09:29 2016 -0500
Committer: Dan LaRocque <da...@hopcount.org>
Committed: Thu Jun 2 03:09:29 2016 -0400

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoMapper.java   | 403 ++++++++++++-------
 .../structure/io/gryo/TypeRegistration.java     |  70 ++++
 .../structure/io/gryo/URISerializer.java        |  23 +-
 .../structure/io/gryo/UUIDSerializer.java       |  20 +-
 .../structure/io/kryoshim/InputShim.java        |  37 ++
 .../gremlin/structure/io/kryoshim/KryoShim.java |  40 ++
 .../structure/io/kryoshim/KryoShimService.java  |  83 ++++
 .../io/kryoshim/KryoShimServiceLoader.java      | 137 +++++++
 .../structure/io/kryoshim/OutputShim.java       |  41 ++
 .../structure/io/kryoshim/SerializerShim.java   |  35 ++
 .../structure/io/kryoshim/package-info.java     |  54 +++
 .../io/kryoshim/shaded/ShadedInputAdapter.java  |  66 +++
 .../io/kryoshim/shaded/ShadedKryoAdapter.java   |  67 +++
 .../io/kryoshim/shaded/ShadedOutputAdapter.java |  72 ++++
 .../shaded/ShadedSerializerAdapter.java         |  54 +++
 .../io/kryoshim/shaded/package-info.java        |  25 ++
 .../util/star/StarGraphGryoSerializer.java      | 123 +-----
 .../util/star/StarGraphSerializer.java          | 140 +++++++
 .../structure/io/HadoopPoolShimService.java     |  69 ++++
 .../hadoop/structure/io/HadoopPools.java        |   1 +
 .../hadoop/structure/io/ObjectWritable.java     |  25 +-
 .../hadoop/structure/io/VertexWritable.java     |  38 +-
 ...remlin.structure.io.kryoshim.KryoShimService |   1 +
 23 files changed, 1293 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 9cae845..5bc71da 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.Contains;
@@ -56,6 +57,8 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
 import org.apache.tinkerpop.gremlin.structure.io.Mapper;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.shaded.ShadedSerializerAdapter;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPath;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
@@ -69,6 +72,7 @@ import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProp
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphGryoSerializer;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer;
 import org.apache.tinkerpop.shaded.kryo.ClassResolver;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.KryoSerializable;
@@ -77,7 +81,6 @@ import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
 import org.apache.tinkerpop.shaded.kryo.util.DefaultStreamFactory;
 import org.apache.tinkerpop.shaded.kryo.util.MapReferenceResolver;
 import org.javatuples.Pair;
-import org.javatuples.Triplet;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -156,13 +159,13 @@ import java.util.stream.Collectors;
 public final class GryoMapper implements Mapper<Kryo> {
     public static final byte[] GIO = "gio".getBytes();
     public static final byte[] HEADER = Arrays.copyOf(GIO, 16);
-    private final List<Triplet<Class, Function<Kryo, Serializer>, Integer>> serializationList;
+    private final List<TypeRegistration<?>> typeRegistrations;
     private final boolean registrationRequired;
     private final boolean referenceTracking;
     private final Supplier<ClassResolver> classResolver;
 
     private GryoMapper(final Builder builder) {
-        this.serializationList = builder.serializationList;
+        this.typeRegistrations = builder.typeRegistrations;
         validate();
 
         this.registrationRequired = builder.registrationRequired;
@@ -176,19 +179,17 @@ public final class GryoMapper implements Mapper<Kryo> {
         kryo.addDefaultSerializer(Map.Entry.class, new EntrySerializer());
         kryo.setRegistrationRequired(registrationRequired);
         kryo.setReferences(referenceTracking);
-
-        serializationList.forEach(p -> {
-            final Function<Kryo, Serializer> serializer = p.getValue1();
-            if (null == serializer)
-                kryo.register(p.getValue0(), kryo.getDefaultSerializer(p.getValue0()), p.getValue2());
-            else
-                kryo.register(p.getValue0(), serializer.apply(kryo), p.getValue2());
-        });
+        for (TypeRegistration tr : typeRegistrations)
+            tr.registerWith(kryo);
         return kryo;
     }
 
     public List<Class> getRegisteredClasses() {
-        return this.serializationList.stream().map(Triplet::getValue0).collect(Collectors.toList());
+        return this.typeRegistrations.stream().map(TypeRegistration::getTargetClass).collect(Collectors.toList());
+    }
+
+    public List<TypeRegistration<?>> getTypeRegistrations() {
+        return typeRegistrations;
     }
 
     public static Builder build() {
@@ -199,11 +200,11 @@ public final class GryoMapper implements Mapper<Kryo> {
         final Set<Integer> duplicates = new HashSet<>();
 
         final Set<Integer> ids = new HashSet<>();
-        serializationList.forEach(t -> {
-            if (!ids.contains(t.getValue2()))
-                ids.add(t.getValue2());
+        typeRegistrations.forEach(t -> {
+            if (!ids.contains(t.getId()))
+                ids.add(t.getId());
             else
-                duplicates.add(t.getValue2());
+                duplicates.add(t.getId());
         });
 
         if (duplicates.size() > 0)
@@ -244,119 +245,119 @@ public final class GryoMapper implements Mapper<Kryo> {
          * Note that the following are pre-registered boolean, Boolean, byte, Byte, char, Character, double, Double,
          * int, Integer, float, Float, long, Long, short, Short, String, void.
          */
-        private final List<Triplet<Class, Function<Kryo, Serializer>, Integer>> serializationList = new ArrayList<Triplet<Class, Function<Kryo, Serializer>, Integer>>() {{
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(byte[].class, null, 25));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(char[].class, null, 26));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(short[].class, null, 27));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(int[].class, null, 28));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(long[].class, null, 29));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(float[].class, null, 30));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(double[].class, null, 31));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(String[].class, null, 32));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Object[].class, null, 33));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ArrayList.class, null, 10));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(BigInteger.class, null, 34));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(BigDecimal.class, null, 35));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Calendar.class, null, 39));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Class.class, null, 41));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Collection.class, null, 37));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Collections.EMPTY_LIST.getClass(), null, 51));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Collections.EMPTY_MAP.getClass(), null, 52));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Collections.EMPTY_SET.getClass(), null, 53));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Collections.singleton(null).getClass(), null, 54));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Collections.singletonList(null).getClass(), null, 24));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Collections.singletonMap(null, null).getClass(), null, 23));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Contains.class, null, 49));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Currency.class, null, 40));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Date.class, null, 38));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Direction.class, null, 12));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DetachedEdge.class, null, 21));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DetachedVertexProperty.class, null, 20));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DetachedProperty.class, null, 18));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DetachedVertex.class, null, 19));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DetachedPath.class, null, 60));
+        private final List<TypeRegistration<?>> typeRegistrations = new ArrayList<TypeRegistration<?>>() {{
+            add(GryoTypeReg.of(byte[].class, 25));
+            add(GryoTypeReg.of(char[].class, 26));
+            add(GryoTypeReg.of(short[].class, 27));
+            add(GryoTypeReg.of(int[].class, 28));
+            add(GryoTypeReg.of(long[].class, 29));
+            add(GryoTypeReg.of(float[].class, 30));
+            add(GryoTypeReg.of(double[].class, 31));
+            add(GryoTypeReg.of(String[].class, 32));
+            add(GryoTypeReg.of(Object[].class, 33));
+            add(GryoTypeReg.of(ArrayList.class, 10));
+            add(GryoTypeReg.of(BigInteger.class, 34));
+            add(GryoTypeReg.of(BigDecimal.class, 35));
+            add(GryoTypeReg.of(Calendar.class, 39));
+            add(GryoTypeReg.of(Class.class, 41));
+            add(GryoTypeReg.of(Collection.class, 37));
+            add(GryoTypeReg.of(Collections.EMPTY_LIST.getClass(), 51));
+            add(GryoTypeReg.of(Collections.EMPTY_MAP.getClass(), 52));
+            add(GryoTypeReg.of(Collections.EMPTY_SET.getClass(), 53));
+            add(GryoTypeReg.of(Collections.singleton(null).getClass(), 54));
+            add(GryoTypeReg.of(Collections.singletonList(null).getClass(), 24));
+            add(GryoTypeReg.of(Collections.singletonMap(null, null).getClass(), 23));
+            add(GryoTypeReg.of(Contains.class, 49));
+            add(GryoTypeReg.of(Currency.class, 40));
+            add(GryoTypeReg.of(Date.class, 38));
+            add(GryoTypeReg.of(Direction.class, 12));
+            add(GryoTypeReg.of(DetachedEdge.class, 21));
+            add(GryoTypeReg.of(DetachedVertexProperty.class, 20));
+            add(GryoTypeReg.of(DetachedProperty.class, 18));
+            add(GryoTypeReg.of(DetachedVertex.class, 19));
+            add(GryoTypeReg.of(DetachedPath.class, 60));
             // skip 14
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(EnumSet.class, null, 46));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(HashMap.class, null, 11));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(HashMap.Entry.class, null, 16));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(HASH_MAP_NODE, null, 92));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(KryoSerializable.class, null, 36));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LinkedHashMap.class, null, 47));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LinkedHashSet.class, null, 71));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LinkedList.class, null, 116));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LINKED_HASH_MAP_ENTRY_CLASS, null, 15));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Locale.class, null, 22));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(StringBuffer.class, null, 43));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(StringBuilder.class, null, 44));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(T.class, null, 48));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TimeZone.class, null, 42));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TreeMap.class, null, 45));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TreeSet.class, null, 50));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(UUID.class, kryo -> new UUIDSerializer(), 17));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(URI.class, kryo -> new URISerializer(), 72));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(VertexTerminator.class, null, 13));
-
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ReferenceEdge.class, null, 81));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ReferenceVertexProperty.class, null, 82));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ReferenceProperty.class, null, 83));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ReferenceVertex.class, null, 84));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ReferencePath.class, null, 85));
-
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(StarGraph.class, kryo -> StarGraphGryoSerializer.with(Direction.BOTH), 86));
-
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Edge.class, kryo -> new GryoSerializers.EdgeSerializer(), 65));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Vertex.class, kryo -> new GryoSerializers.VertexSerializer(), 66));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Property.class, kryo -> new GryoSerializers.PropertySerializer(), 67));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(VertexProperty.class, kryo -> new GryoSerializers.VertexPropertySerializer(), 68));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Path.class, kryo -> new GryoSerializers.PathSerializer(), 59));
+            add(GryoTypeReg.of(EnumSet.class, 46));
+            add(GryoTypeReg.of(HashMap.class, 11));
+            add(GryoTypeReg.of(HashMap.Entry.class, 16));
+            add(GryoTypeReg.of(HASH_MAP_NODE, 92));
+            add(GryoTypeReg.of(KryoSerializable.class, 36));
+            add(GryoTypeReg.of(LinkedHashMap.class, 47));
+            add(GryoTypeReg.of(LinkedHashSet.class, 71));
+            add(GryoTypeReg.of(LinkedList.class, 116));
+            add(GryoTypeReg.of(LINKED_HASH_MAP_ENTRY_CLASS, 15));
+            add(GryoTypeReg.of(Locale.class, 22));
+            add(GryoTypeReg.of(StringBuffer.class, 43));
+            add(GryoTypeReg.of(StringBuilder.class, 44));
+            add(GryoTypeReg.of(T.class, 48));
+            add(GryoTypeReg.of(TimeZone.class, 42));
+            add(GryoTypeReg.of(TreeMap.class, 45));
+            add(GryoTypeReg.of(TreeSet.class, 50));
+            add(GryoTypeReg.of(UUID.class, 17, new UUIDSerializer()));
+            add(GryoTypeReg.of(URI.class, 72, new URISerializer()));
+            add(GryoTypeReg.of(VertexTerminator.class, 13));
+
+            add(GryoTypeReg.of(ReferenceEdge.class, 81));
+            add(GryoTypeReg.of(ReferenceVertexProperty.class, 82));
+            add(GryoTypeReg.of(ReferenceProperty.class, 83));
+            add(GryoTypeReg.of(ReferenceVertex.class, 84));
+            add(GryoTypeReg.of(ReferencePath.class, 85));
+
+            add(GryoTypeReg.of(StarGraph.class, 86, new StarGraphSerializer(Direction.BOTH, new GraphFilter())));
+
+            add(GryoTypeReg.of(Edge.class, 65, new GryoSerializers.EdgeSerializer()));
+            add(GryoTypeReg.of(Vertex.class, 66, new GryoSerializers.VertexSerializer()));
+            add(GryoTypeReg.of(Property.class, 67, new GryoSerializers.PropertySerializer()));
+            add(GryoTypeReg.of(VertexProperty.class, 68, new GryoSerializers.VertexPropertySerializer()));
+            add(GryoTypeReg.of(Path.class, 59, new GryoSerializers.PathSerializer()));
             // skip 55
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(B_O_Traverser.class, null, 75));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(O_Traverser.class, null, 76));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(B_LP_O_P_S_SE_SL_Traverser.class, null, 77));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(B_O_S_SE_SL_Traverser.class, null, 78));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(B_LP_O_S_SE_SL_Traverser.class, null, 87));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(O_OB_S_SE_SL_Traverser.class, null, 89));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LP_O_OB_S_SE_SL_Traverser.class, null, 90));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LP_O_OB_P_S_SE_SL_Traverser.class, null, 91));
-
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TraverserSet.class, null, 58));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Tree.class, null, 61));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(HashSet.class, null, 62));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(BulkSet.class, null, 64));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MutableMetrics.class, null, 69));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ImmutableMetrics.class, null, 115));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(DefaultTraversalMetrics.class, null, 70));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MapMemory.class, null, 73));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MapReduce.NullObject.class, null, 74));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(AtomicLong.class, null, 79));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Pair.class, kryo -> new PairSerializer(), 88));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TraversalExplanation.class, kryo -> new JavaSerializer(), 106));
-
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Duration.class, kryo -> new JavaTimeSerializers.DurationSerializer(), 93));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Instant.class, kryo -> new JavaTimeSerializers.InstantSerializer(), 94));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LocalDate.class, kryo -> new JavaTimeSerializers.LocalDateSerializer(), 95));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LocalDateTime.class, kryo -> new JavaTimeSerializers.LocalDateTimeSerializer(), 96));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(LocalTime.class, kryo -> new JavaTimeSerializers.LocalTimeSerializer(), 97));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MonthDay.class, kryo -> new JavaTimeSerializers.MonthDaySerializer(), 98));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(OffsetDateTime.class, kryo -> new JavaTimeSerializers.OffsetDateTimeSerializer(), 99));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(OffsetTime.class, kryo -> new JavaTimeSerializers.OffsetTimeSerializer(), 100));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Period.class, kryo -> new JavaTimeSerializers.PeriodSerializer(), 101));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Year.class, kryo -> new JavaTimeSerializers.YearSerializer(), 102));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(YearMonth.class, kryo -> new JavaTimeSerializers.YearMonthSerializer(), 103));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ZonedDateTime.class, kryo -> new JavaTimeSerializers.ZonedDateTimeSerializer(), 104));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ZoneOffset.class, kryo -> new JavaTimeSerializers.ZoneOffsetSerializer(), 105));
-
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Operator.class, null, 107));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(FoldStep.FoldBiOperator.class, null, 108));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupCountStep.GroupCountBiOperator.class, null, 109));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStep.GroupBiOperator.class, kryo -> new JavaSerializer(), 117)); // because they contain traversals
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MeanGlobalStep.MeanGlobalBiOperator.class, null, 110));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MeanGlobalStep.MeanNumber.class, null, 111));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TreeStep.TreeBiOperator.class, null, 112));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStepV3d0.GroupBiOperatorV3d0.class, null, 113));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(RangeGlobalStep.RangeBiOperator.class, null, 114));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(OrderGlobalStep.OrderBiOperator.class, kryo -> new JavaSerializer(), 118)); // because they contain traversals
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(ProfileStep.ProfileBiOperator.class, null, 119)); // ***LAST ID***
+            add(GryoTypeReg.of(B_O_Traverser.class, 75));
+            add(GryoTypeReg.of(O_Traverser.class, 76));
+            add(GryoTypeReg.of(B_LP_O_P_S_SE_SL_Traverser.class, 77));
+            add(GryoTypeReg.of(B_O_S_SE_SL_Traverser.class, 78));
+            add(GryoTypeReg.of(B_LP_O_S_SE_SL_Traverser.class, 87));
+            add(GryoTypeReg.of(O_OB_S_SE_SL_Traverser.class, 89));
+            add(GryoTypeReg.of(LP_O_OB_S_SE_SL_Traverser.class, 90));
+            add(GryoTypeReg.of(LP_O_OB_P_S_SE_SL_Traverser.class, 91));
+
+            add(GryoTypeReg.of(TraverserSet.class, 58));
+            add(GryoTypeReg.of(Tree.class, 61));
+            add(GryoTypeReg.of(HashSet.class, 62));
+            add(GryoTypeReg.of(BulkSet.class, 64));
+            add(GryoTypeReg.of(MutableMetrics.class, 69));
+            add(GryoTypeReg.of(ImmutableMetrics.class, 115));
+            add(GryoTypeReg.of(DefaultTraversalMetrics.class, 70));
+            add(GryoTypeReg.of(MapMemory.class, 73));
+            add(GryoTypeReg.of(MapReduce.NullObject.class, 74));
+            add(GryoTypeReg.of(AtomicLong.class, 79));
+            add(GryoTypeReg.of(Pair.class, 88, new PairSerializer()));
+            add(GryoTypeReg.of(TraversalExplanation.class, 106, new JavaSerializer()));
+
+            add(GryoTypeReg.of(Duration.class, 93, new JavaTimeSerializers.DurationSerializer()));
+            add(GryoTypeReg.of(Instant.class, 94, new JavaTimeSerializers.InstantSerializer()));
+            add(GryoTypeReg.of(LocalDate.class, 95, new JavaTimeSerializers.LocalDateSerializer()));
+            add(GryoTypeReg.of(LocalDateTime.class, 96, new JavaTimeSerializers.LocalDateTimeSerializer()));
+            add(GryoTypeReg.of(LocalTime.class, 97, new JavaTimeSerializers.LocalTimeSerializer()));
+            add(GryoTypeReg.of(MonthDay.class, 98, new JavaTimeSerializers.MonthDaySerializer()));
+            add(GryoTypeReg.of(OffsetDateTime.class, 99, new JavaTimeSerializers.OffsetDateTimeSerializer()));
+            add(GryoTypeReg.of(OffsetTime.class, 100, new JavaTimeSerializers.OffsetTimeSerializer()));
+            add(GryoTypeReg.of(Period.class, 101, new JavaTimeSerializers.PeriodSerializer()));
+            add(GryoTypeReg.of(Year.class, 102, new JavaTimeSerializers.YearSerializer()));
+            add(GryoTypeReg.of(YearMonth.class, 103, new JavaTimeSerializers.YearMonthSerializer()));
+            add(GryoTypeReg.of(ZonedDateTime.class, 104, new JavaTimeSerializers.ZonedDateTimeSerializer()));
+            add(GryoTypeReg.of(ZoneOffset.class, 105, new JavaTimeSerializers.ZoneOffsetSerializer()));
+
+            add(GryoTypeReg.of(Operator.class, 107));
+            add(GryoTypeReg.of(FoldStep.FoldBiOperator.class, 108));
+            add(GryoTypeReg.of(GroupCountStep.GroupCountBiOperator.class, 109));
+            add(GryoTypeReg.of(GroupStep.GroupBiOperator.class, 117, new JavaSerializer())); // because they contain traversals
+            add(GryoTypeReg.of(MeanGlobalStep.MeanGlobalBiOperator.class, 110));
+            add(GryoTypeReg.of(MeanGlobalStep.MeanNumber.class, 111));
+            add(GryoTypeReg.of(TreeStep.TreeBiOperator.class, 112));
+            add(GryoTypeReg.of(GroupStepV3d0.GroupBiOperatorV3d0.class, 113));
+            add(GryoTypeReg.of(RangeGlobalStep.RangeBiOperator.class, 114));
+            add(GryoTypeReg.of(OrderGlobalStep.OrderBiOperator.class, 118, new JavaSerializer())); // because they contain traversals
+            add(GryoTypeReg.of(ProfileStep.ProfileBiOperator.class, 119)); // ***LAST ID***
         }};
 
         private final List<IoRegistry> registries = new ArrayList<>();
@@ -404,8 +405,8 @@ public final class GryoMapper implements Mapper<Kryo> {
          */
         public Builder addCustom(final Class... custom) {
             if (custom != null && custom.length > 0) {
-                for (Class clazz : custom) {
-                    addCustom(clazz, (Function<Kryo, Serializer>) null);
+                for (Class c : custom) {
+                    addOrOverrideRegistration(GryoTypeReg.of(c, currentSerializationId.getAndIncrement()));
                 }
             }
             return this;
@@ -416,10 +417,15 @@ public final class GryoMapper implements Mapper<Kryo> {
          * a class that is already registered will override that registration.
          */
         public Builder addCustom(final Class clazz, final Serializer serializer) {
-            if (null == serializer)
-                addCustom(clazz);
-            else
-                addCustom(clazz, kryo -> serializer);
+            addOrOverrideRegistration(GryoTypeReg.of(clazz, currentSerializationId.getAndIncrement(), serializer));
+            return this;
+        }
+
+        /**
+         * Register custom class to serialize with a custom serialization shim.
+         */
+        public Builder addCustom(final Class clazz, final SerializerShim serializer) {
+            addOrOverrideRegistration(GryoTypeReg.of(clazz, currentSerializationId.getAndIncrement(), serializer));
             return this;
         }
 
@@ -427,15 +433,8 @@ public final class GryoMapper implements Mapper<Kryo> {
          * Register a custom class to serialize with a custom serializer as returned from a {@link Function}. Note
          * that calling this method for a class that is already registered will override that registration.
          */
-        public Builder addCustom(final Class clazz, final Function<Kryo, Serializer> serializer) {
-            final Optional<Triplet<Class, Function<Kryo, Serializer>, Integer>> found = findSerializer(clazz);
-            if (found.isPresent()) {
-                final Triplet<Class, Function<Kryo, Serializer>, Integer> t = found.get();
-                serializationList.remove(t);
-                serializationList.add(t.setAt1(serializer));
-            } else
-                serializationList.add(Triplet.with(clazz, serializer, currentSerializationId.getAndIncrement()));
-
+        public Builder addCustom(final Class clazz, final Function<Kryo, Serializer> functionOfKryo) {
+            addOrOverrideRegistration(GryoTypeReg.of(clazz, currentSerializationId.getAndIncrement(), functionOfKryo));
             return this;
         }
 
@@ -481,23 +480,123 @@ public final class GryoMapper implements Mapper<Kryo> {
                         addCustom(p.getValue0(), (Function<Kryo, Serializer>) p.getValue1());
                     else
                         throw new IllegalStateException(String.format(
-                                "Unexpected value provided by the %s for %s - expects [null, %s implementation or Function<%s, %s>]",
-                                IoRegistry.class.getSimpleName(), p.getValue0().getClass().getSimpleName(),
+                                "Unexpected value provided by %s for serializable class %s - expected a parameter in [null, %s implementation or Function<%s, %s>], but received %s",
+                                registry.getClass().getSimpleName(), p.getValue0().getClass().getCanonicalName(),
                                 Serializer.class.getName(), Kryo.class.getSimpleName(),
-                                Serializer.class.getSimpleName()));
+                                Serializer.class.getSimpleName(), p.getValue1()));
                 });
             });
 
             return new GryoMapper(this);
         }
 
-        private Optional<Triplet<Class, Function<Kryo, Serializer>, Integer>> findSerializer(final Class clazz) {
-            final Iterator<Triplet<Class, Function<Kryo, Serializer>, Integer>> itty = IteratorUtils.filter(
-                    serializationList, t -> t.getValue0().equals(clazz)).iterator();
-            if (itty.hasNext())
-                return Optional.of(itty.next());
-            else
-                return Optional.empty();
+        private <T> void addOrOverrideRegistration(TypeRegistration<T> newRegistration) {
+            Iterator<TypeRegistration<?>> iter = typeRegistrations.iterator();
+            while (iter.hasNext()) {
+                TypeRegistration<?> existingRegistration = iter.next();
+                if (existingRegistration.getTargetClass().equals(newRegistration.getTargetClass())) {
+                    iter.remove();
+                    break;
+                }
+            }
+            typeRegistrations.add(newRegistration);
+        }
+    }
+
+    private static class GryoTypeReg<T> implements TypeRegistration<T> {
+
+        private final Class<T> clazz;
+        private final Serializer<T> shadedSerializer;
+        private final SerializerShim<T> serializerShim;
+        private final Function<Kryo, Serializer> functionOfShadedKryo;
+        private final int id;
+
+        private GryoTypeReg(Class<T> clazz,
+                            Serializer<T> shadedSerializer,
+                            SerializerShim<T> serializerShim,
+                            Function<Kryo, Serializer> functionOfShadedKryo,
+                            int id) {
+            this.clazz = clazz;
+            this.shadedSerializer = shadedSerializer;
+            this.serializerShim = serializerShim;
+            this.functionOfShadedKryo = functionOfShadedKryo;
+            this.id = id;
+
+            int serializerCount = 0;
+            if (null != this.shadedSerializer)
+                serializerCount++;
+            if (null != this.serializerShim)
+                serializerCount++;
+            if (null != this.functionOfShadedKryo)
+                serializerCount++;
+
+            if (1 < serializerCount) {
+                String msg = String.format(
+                        "GryoTypeReg accepts at most one kind of serializer, but multiple " +
+                                "serializers were supplied for class %s (id %s).  " +
+                                "Shaded serializer: %s.  Shim serializer: %s.  Shaded serializer function: %s.",
+                        this.clazz.getCanonicalName(), id,
+                        this.shadedSerializer, this.serializerShim, this.functionOfShadedKryo);
+                throw new IllegalArgumentException(msg);
+            }
+        }
+
+        private static <T> GryoTypeReg<T> of(Class<T> clazz, int id) {
+            return new GryoTypeReg<>(clazz, null, null, null, id);
+        }
+
+        private static <T> GryoTypeReg<T> of(Class<T> clazz, int id, Serializer<T> shadedSerializer) {
+            return new GryoTypeReg<>(clazz, shadedSerializer, null, null, id);
+        }
+
+        private static <T> GryoTypeReg<T> of(Class<T> clazz, int id, SerializerShim<T> serializerShim) {
+            return new GryoTypeReg<>(clazz, null, serializerShim, null, id);
+        }
+
+        private static <T> GryoTypeReg<T> of(Class clazz, int id, Function<Kryo, Serializer> fct) {
+            return new GryoTypeReg<>(clazz, null, null, fct, id);
+        }
+
+        @Override
+        public Serializer<T> getShadedSerializer() {
+            return shadedSerializer;
+        }
+
+        @Override
+        public SerializerShim<T> getSerializerShim() {
+            return serializerShim;
+        }
+
+        @Override
+        public Function<Kryo, Serializer> getFunctionOfShadedKryo() {
+            return functionOfShadedKryo;
+        }
+
+        @Override
+        public Class<T> getTargetClass() {
+            return clazz;
+        }
+
+        @Override
+        public int getId() {
+            return id;
+        }
+
+        @Override
+        public Kryo registerWith(Kryo kryo) {
+            if (null != functionOfShadedKryo)
+                kryo.register(clazz, functionOfShadedKryo.apply(kryo), id);
+            else if (null != shadedSerializer)
+                kryo.register(clazz, shadedSerializer, id);
+            else if (null != serializerShim)
+                kryo.register(clazz, new ShadedSerializerAdapter<>(serializerShim), id);
+            else {
+                kryo.register(clazz, kryo.getDefaultSerializer(clazz), id);
+                // Suprisingly, the preceding call is not equivalent to
+                //   kryo.register(clazz, id);
+            }
+
+            return kryo;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
new file mode 100644
index 0000000..5ca3f31
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+
+/**
+ * Represents a class serializable with Gryo.
+ * <p>
+ * At most one of the {@link #getShadedSerializer()}, {@link #getSerializerShim()},
+ * and {@link #getFunctionOfShadedKryo()} will return a non-null value.  If all
+ * three methods return null, then there is no custom serialization logic associated
+ * with this class.  Gryo/Kryo will use its default serializer.
+ *
+ * @param <T> the serializable type
+ */
+public interface TypeRegistration<T> {
+
+    /**
+     * @return the serializable class this instance describes
+     */
+    Class<T> getTargetClass();
+
+    /**
+     * @return numeric identifier used as a shorthand for this type in Gryo's serialized form
+     */
+    int getId();
+
+    /**
+     * @return the shaded-Kryo serializer that handles this type, if one is defined
+     */
+    Serializer<T> getShadedSerializer();
+
+    /**
+     * @return the shim-Kryo serializer that handles this type, if one is defined
+     */
+    SerializerShim<T> getSerializerShim();
+
+    /**
+     * @return a function that accepts a shaded-Kryo instance and returns a serializer, if such a function is defined
+     */
+    java.util.function.Function<Kryo, Serializer> getFunctionOfShadedKryo();
+
+    /**
+     * Registers this type on the supplied {@link Kryo} instance, using whatever custom serializer
+     * may be present, then returns the same {@linkplain Kryo} instance supplied as the parameter.
+     *
+     * @param kryo Kryo instance into which this type is registered
+     * @return the sole parameter
+     */
+    Kryo registerWith(Kryo kryo);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/URISerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/URISerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/URISerializer.java
index 7b6cfec..de08061 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/URISerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/URISerializer.java
@@ -18,29 +18,32 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
 
 import java.net.URI;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-final class URISerializer extends Serializer<URI> {
+final class URISerializer implements SerializerShim<URI> {
 
-    public URISerializer() {
-        setImmutable(true);
-    }
+    public URISerializer() { }
 
     @Override
-    public void write(final Kryo kryo, final Output output, final URI uri) {
+    public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final URI uri) {
         output.writeString(uri.toString());
     }
 
     @Override
-    public URI read(final Kryo kryo, final Input input, final Class<URI> uriClass) {
+    public <I extends InputShim> URI read(final KryoShim<I, ?> kryo, final I input, final Class<URI> uriClass) {
         return URI.create(input.readString());
     }
+
+    @Override
+    public boolean isImmutable() {
+        return true;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/UUIDSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/UUIDSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/UUIDSerializer.java
index e694e38..b86ddc2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/UUIDSerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/UUIDSerializer.java
@@ -18,6 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.Serializer;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
@@ -28,19 +32,23 @@ import java.util.UUID;
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-final class UUIDSerializer extends Serializer<UUID> {
-    public UUIDSerializer() {
-        setImmutable(true);
-    }
+final class UUIDSerializer implements SerializerShim<UUID> {
+
+    public UUIDSerializer() { }
 
     @Override
-    public void write(final Kryo kryo, final Output output, final UUID uuid) {
+    public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final UUID uuid) {
         output.writeLong(uuid.getMostSignificantBits());
         output.writeLong(uuid.getLeastSignificantBits());
     }
 
     @Override
-    public UUID read(final Kryo kryo, final Input input, final Class<UUID> uuidClass) {
+    public <I extends InputShim> UUID read(final KryoShim<I, ?> kryo, final I input, final Class<UUID> uuidClass) {
         return new UUID(input.readLong(), input.readLong());
     }
+
+    @Override
+    public boolean isImmutable() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/InputShim.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/InputShim.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/InputShim.java
new file mode 100644
index 0000000..6d00884
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/InputShim.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim;
+
+/**
+ * A minimal {@link org.apache.tinkerpop.shaded.kryo.io.Input}-like abstraction.
+ */
+public interface InputShim {
+
+    byte readByte();
+
+    byte[] readBytes(int size);
+
+    String readString();
+
+    long readLong();
+
+    int readInt();
+
+    double readDouble();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShim.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShim.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShim.java
new file mode 100644
index 0000000..e2a95e6
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShim.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim;
+
+/**
+ * A minimal {@link org.apache.tinkerpop.shaded.kryo.Kryo}-like abstraction.
+ *
+ * @param <I> this interface's complementary InputShim
+ * @param <O> this interface's complementary OutputShim
+ */
+public interface KryoShim<I extends InputShim, O extends OutputShim> {
+
+    <T> T readObject(I input, Class<T> type);
+
+    Object readClassAndObject(I input);
+
+    void writeObject(O output, Object object);
+
+    void writeClassAndObject(O output, Object object);
+
+    <T> T readObjectOrNull(I input, Class<T> type);
+
+    void writeObjectOrNull(O output, Object object, Class type);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimService.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimService.java
new file mode 100644
index 0000000..024d40c
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimService.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This interface exists to decouple HadoopPools from TinkerPop's shaded Kryo.
+ * <p>
+ * VertexWritable and ObjectWritable formerly implemented Serializable by
+ * resorting to statically-pooled shaded Kryo instances maintained by the HadoopPools class.
+ * This is awkward because those shaded Kryo instances require class registration by default.
+ * <p>
+ * Consider what happens with custom property datatypes reachable from the reference graph rooted at an ObjectWritable
+ * or VertexWritable instance.  It is not enough for these property classes to merely implement
+ * Serializable, though one think that from skimming ObjectWritable/VertexWritable.  Those classes
+ * must also register with TinkerPop's internal, shaded Kryo instances as maintained by HadoopPools,
+ * or else configure those instances to accept unregistered classes.
+ * Otherwise, TinkerPop's shaded Kryo will refuse to serialize those properties (even though
+ * they implement Serializable, and even though the user might think they are only using
+ * Java's standard Serialization mechanism!).
+ * <p>
+ * By hiding the mechanics of serialization behind this interface instead of hardcoding it in
+ * HadoopPools, the user can decide how to implement serialization for ObjectWritable/VertexWritable
+ * (and whatever other classes in TinkerPop decide to implement Serializable but then delegate
+ * all of the implementation details, like ObjectWritable/VertexWritable do now).
+ */
+public interface KryoShimService {
+
+    /**
+     * Deserializes an object from an input stream.
+     *
+     * @param source the stream from which to read an object's serialized form
+     * @return the first deserialized object available from {@code source}
+     */
+    Object readClassAndObject(InputStream source);
+
+    /**
+     * Serializes an object to an output stream.  This may flush the output stream.
+     *
+     * @param o the object to serialize
+     * @param sink the stream into which the serialized object is written
+     */
+    void writeClassAndObject(Object o, OutputStream sink);
+
+    /**
+     * Returns this service's relative priority number.  Unless explicitly overridden through a
+     * system property ({@link KryoShimServiceLoader#SHIM_CLASS_SYSTEM_PROPERTY}),
+     * the service implementation with the numerically highest priority will be used
+     * and all others ignored.  In other words, the highest priority wins (in the absence of a
+     * system property override).
+     * <p>
+     * TinkerPop's current default implementation uses priority value zero.
+     * <p>
+     * Third-party implementations of this interface should (but are not technically required)
+     * to use a priority value with absolute value greater than 100.
+     * <p>
+     * The implementation currently breaks priority ties by lexicographical comparison of
+     * fully-qualified package-and-classname, but this tie-breaking behavior should be
+     * considered undefined and subject to future change.  Ties are ignored if the service
+     * is explicitly set through the system property mentioned above.
+     *
+     * @return this implementation's priority value
+     */
+    int getPriority();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimServiceLoader.java
new file mode 100644
index 0000000..1d5413d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/KryoShimServiceLoader.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ServiceLoader;
+
+public class KryoShimServiceLoader {
+
+    private static volatile KryoShimService CACHED_SHIM_SERVICE;
+
+    private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class);
+
+    /**
+     * Set this system property to the fully-qualified name of a {@link KryoShimService}
+     * package-and-classname to force it into service.  Setting this property causes the
+     * priority-selection mechanism ({@link KryoShimService#getPriority()}) to be ignored.
+     */
+    public static final String SHIM_CLASS_SYSTEM_PROPERTY = "tinkerpop.kryo.shim";
+
+    public static KryoShimService load(boolean forceReload) {
+
+        if (null != CACHED_SHIM_SERVICE && !forceReload) {
+            return CACHED_SHIM_SERVICE;
+        }
+
+        ArrayList<KryoShimService> services = new ArrayList<>();
+
+        ServiceLoader<KryoShimService> sl = ServiceLoader.load(KryoShimService.class);
+
+        KryoShimService result = null;
+
+        synchronized (KryoShimServiceLoader.class) {
+            if (forceReload) {
+                sl.reload();
+            }
+
+            for (KryoShimService kss : sl) {
+                services.add(kss);
+            }
+        }
+
+        String shimClass = System.getProperty(SHIM_CLASS_SYSTEM_PROPERTY);
+
+        if (null != shimClass) {
+            for (KryoShimService kss : services) {
+                if (kss.getClass().getCanonicalName().equals(shimClass)) {
+                    log.info("Set {} provider to {} ({}) from system property {}={}",
+                            KryoShimService.class.getSimpleName(), kss, kss.getClass(),
+                            SHIM_CLASS_SYSTEM_PROPERTY, shimClass);
+                    result = kss;
+                }
+            }
+        } else {
+            Collections.sort(services, KryoShimServiceComparator.INSTANCE);
+
+            for (KryoShimService kss : services) {
+                log.debug("Found Kryo shim service class {} (priority {})", kss.getClass(), kss.getPriority());
+            }
+
+            if (0 != services.size()) {
+                result = services.get(services.size() - 1);
+            }
+        }
+
+
+        if (null == result) {
+            throw new IllegalStateException("Unable to load KryoShimService");
+        }
+
+        log.info("Set {} provider to {} ({}) because its priority value ({}) is the highest available",
+                KryoShimService.class.getSimpleName(), result, result.getClass(), result.getPriority());
+
+        return CACHED_SHIM_SERVICE = result;
+    }
+
+    public static KryoShimService load() {
+        return load(false);
+    }
+
+    public static byte[] writeClassAndObjectToBytes(Object o) {
+        KryoShimService shimService = load();
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        shimService.writeClassAndObject(o, baos);
+
+        return baos.toByteArray();
+    }
+
+    public static <T> T readClassAndObject(InputStream source) {
+        KryoShimService shimService = load();
+
+        return (T)shimService.readClassAndObject(source);
+    }
+
+    private enum KryoShimServiceComparator implements Comparator<KryoShimService> {
+        INSTANCE;
+
+        @Override
+        public int compare(KryoShimService a, KryoShimService b) {
+            int ap = a.getPriority();
+            int bp = b.getPriority();
+
+            if (ap < bp) {
+                return -1;
+            } else if (bp < ap) {
+                return 1;
+            } else {
+                return a.getClass().getCanonicalName().compareTo(b.getClass().getCanonicalName());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/OutputShim.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/OutputShim.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/OutputShim.java
new file mode 100644
index 0000000..4468434
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/OutputShim.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim;
+
+import java.io.IOException;
+
+/**
+ * A minimal {@link org.apache.tinkerpop.shaded.kryo.io.Output}-like abstraction.
+ */
+public interface OutputShim {
+
+    void writeByte(byte b);
+
+    void writeBytes(byte[] array, int offset, int count);
+
+    void writeString(String s);
+
+    void writeLong(long l);
+
+    void writeInt(int i);
+
+    void writeDouble(double d);
+
+    void flush();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/SerializerShim.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/SerializerShim.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/SerializerShim.java
new file mode 100644
index 0000000..686350d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/SerializerShim.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim;
+
+/**
+ * A minimal {@link org.apache.tinkerpop.shaded.kryo.Serializer}-like abstraction.
+ *
+ * @param <T> the class this serializer reads/writes from/to bytes.
+ */
+public interface SerializerShim<T> {
+
+    <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, T starGraph);
+
+    <I extends InputShim> T read(KryoShim<I, ?> kryo, I input, Class<T> clazz);
+
+    default boolean isImmutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/package-info.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/package-info.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/package-info.java
new file mode 100644
index 0000000..436f117
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/package-info.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Abstracts a minimal subset of Kryo types and methods.
+ * <p>
+ * Kryo is often shaded.  For instance, TinkerPop's Gryo
+ * serializer relies on a shaded Kryo package.
+ * TinkerPop serializers written against a particular shaded
+ * Kryo package (or the unshaded Kryo package) are compatible
+ * only with that package.  In contrast, TinkerPop serializers written
+ * against this abstraction can be used with any shaded or
+ * unshaded Kryo package, so long as the signatures and behavior
+ * of the methods in this package remain stable.
+ * <p>
+ * To show how this is useful, consider
+ * {@link org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer}.
+ * This class has logic unique to TinkerPop that performs
+ * efficient and forward-compatible serialization of
+ * {@link org.apache.tinkerpop.gremlin.structure.util.star.StarGraph}
+ * instances.  It takes advantage of package-level visibility
+ * and the fact that it shares a package with its target,
+ * so it would be challenging to cleanly and naturally replicate
+ * (i.e. without package spoofing or runtime visibility overrides).
+ * By implementing
+ * {@link org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim}
+ * instead of, say, Gryo's shaded
+ * {@link org.apache.tinkerpop.shaded.kryo.Serializer},
+ * it can be used with anybody's Kryo package, regardless of whether
+ * that package is shaded or not.  This lets third-parties reuse
+ * TinkerPop's efficient, internals-aware StarGraph serializer on
+ * their own serialization platform (and without altering
+ * TinkerPop's bytecode, let alone its source).
+ * <p>
+ * The number of types and methods in this
+ * package is deliberately small to reduce the likelihood of a
+ * new Kryo release introducing an incompatible change.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedInputAdapter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedInputAdapter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedInputAdapter.java
new file mode 100644
index 0000000..2872326
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedInputAdapter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim.shaded;
+
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+
+public class ShadedInputAdapter implements InputShim {
+
+    private final Input shadedInput;
+
+    public ShadedInputAdapter(Input shadedInput) {
+        this.shadedInput = shadedInput;
+    }
+
+    Input getShadedInput() {
+        return shadedInput;
+    }
+
+    @Override
+    public byte readByte()
+    {
+        return shadedInput.readByte();
+    }
+
+    @Override
+    public byte[] readBytes(int size) {
+        return shadedInput.readBytes(size);
+    }
+
+    @Override
+    public String readString() {
+        return shadedInput.readString();
+    }
+
+    @Override
+    public long readLong() {
+        return shadedInput.readLong();
+    }
+
+    @Override
+    public int readInt() {
+        return shadedInput.readInt();
+    }
+
+    @Override
+    public double readDouble() {
+        return shadedInput.readDouble();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedKryoAdapter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedKryoAdapter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedKryoAdapter.java
new file mode 100644
index 0000000..0e85f6f
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedKryoAdapter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim.shaded;
+
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+
+public class ShadedKryoAdapter implements KryoShim<ShadedInputAdapter, ShadedOutputAdapter> {
+
+    private final Kryo shadedKryo;
+
+    public ShadedKryoAdapter(Kryo shadedKryo) {
+        this.shadedKryo = shadedKryo;
+    }
+
+    @Override
+    public <T> T readObject(ShadedInputAdapter input, Class<T> type)
+    {
+        return shadedKryo.readObject(input.getShadedInput(), type);
+    }
+
+    @Override
+    public Object readClassAndObject(ShadedInputAdapter input)
+    {
+        return shadedKryo.readClassAndObject(input.getShadedInput());
+    }
+
+    @Override
+    public void writeObject(ShadedOutputAdapter output, Object object)
+    {
+        shadedKryo.writeObject(output.getShadedOutput(), object);
+    }
+
+    @Override
+    public void writeClassAndObject(ShadedOutputAdapter output, Object object)
+    {
+        shadedKryo.writeClassAndObject(output.getShadedOutput(), object);
+    }
+
+    @Override
+    public <T> T readObjectOrNull(ShadedInputAdapter input, Class<T> type)
+    {
+        return shadedKryo.readObjectOrNull(input.getShadedInput(), type);
+    }
+
+    @Override
+    public void writeObjectOrNull(ShadedOutputAdapter output, Object object, Class type)
+    {
+        shadedKryo.writeObjectOrNull(output.getShadedOutput(), object, type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedOutputAdapter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedOutputAdapter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedOutputAdapter.java
new file mode 100644
index 0000000..5ca0ecb
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedOutputAdapter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim.shaded;
+
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+public class ShadedOutputAdapter implements OutputShim {
+
+    private final Output shadedOutput;
+
+    public ShadedOutputAdapter(Output shadedOutput) {
+        this.shadedOutput = shadedOutput;
+    }
+
+    @Override
+    public void writeByte(byte b)
+    {
+        shadedOutput.writeByte(b);
+    }
+
+    @Override
+    public void writeBytes(byte[] array, int offset, int count) {
+        shadedOutput.writeBytes(array, offset, count);
+    }
+
+    @Override
+    public void writeString(String s) {
+        shadedOutput.writeString(s);
+    }
+
+    @Override
+    public void writeLong(long l) {
+        shadedOutput.writeLong(l);
+    }
+
+    @Override
+    public void writeInt(int i) {
+        shadedOutput.writeInt(i);
+    }
+
+    @Override
+    public void writeDouble(double d) {
+        shadedOutput.writeDouble(d);
+    }
+
+    @Override
+    public void flush() {
+        shadedOutput.flush();
+    }
+
+    Output getShadedOutput()
+    {
+        return shadedOutput;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedSerializerAdapter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedSerializerAdapter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedSerializerAdapter.java
new file mode 100644
index 0000000..ebfd0fb
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/ShadedSerializerAdapter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim.shaded;
+
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+public class ShadedSerializerAdapter<T> extends Serializer<T> {
+
+    SerializerShim<T> serializer;
+
+    public ShadedSerializerAdapter(SerializerShim<T> serializer) {
+        this.serializer = serializer;
+        setImmutable(this.serializer.isImmutable());
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, T t) {
+        /* These adapters could be cached pretty efficiently in instance fields if it were guaranteed that this
+         * class was never subject to concurrent use.  That's true of Kryo instances, but it is not clear that
+         * it is true of Serializer instances.
+         */
+        ShadedKryoAdapter shadedKryoAdapter = new ShadedKryoAdapter(kryo);
+        ShadedOutputAdapter shadedOutputAdapter = new ShadedOutputAdapter(output);
+        serializer.write(shadedKryoAdapter, shadedOutputAdapter, t);
+    }
+
+    @Override
+    public T read(Kryo kryo, Input input, Class<T> aClass) {
+        // Same caching opportunity as in write(...)
+        ShadedKryoAdapter shadedKryoAdapter = new ShadedKryoAdapter(kryo);
+        ShadedInputAdapter shadedInputAdapter = new ShadedInputAdapter(input);
+        return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/package-info.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/package-info.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/package-info.java
new file mode 100644
index 0000000..ea70003
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/kryoshim/shaded/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implementations of the interfaces in
+ * {@link org.apache.tinkerpop.gremlin.structure.io.kryoshim}
+ * using TinkerPop's shaded copy of Kryo.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.kryoshim.shaded;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
index 3805e2c..2acd770 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphGryoSerializer.java
@@ -18,20 +18,13 @@
  */
 package org.apache.tinkerpop.gremlin.structure.util.star;
 
-import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
-
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.shaded.ShadedSerializerAdapter;
+
 /**
  * Kryo serializer for {@link StarGraph}.  Implements an internal versioning capability for backward compatibility.
  * The single byte at the front of the serialization stream denotes the version.  That version can be used to choose
@@ -42,15 +35,10 @@ import java.util.Map;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
+public final class StarGraphGryoSerializer extends ShadedSerializerAdapter<StarGraph>  {
 
     private static final Map<Direction, StarGraphGryoSerializer> CACHE = new HashMap<>();
 
-    private final Direction edgeDirectionToSerialize;
-    private GraphFilter graphFilter = new GraphFilter(); // will allow all vertices/edges
-
-    private final static byte VERSION_1 = Byte.MIN_VALUE;
-
     static {
         CACHE.put(Direction.BOTH, new StarGraphGryoSerializer(Direction.BOTH));
         CACHE.put(Direction.IN, new StarGraphGryoSerializer(Direction.IN));
@@ -58,8 +46,12 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
         CACHE.put(null, new StarGraphGryoSerializer(null));
     }
 
+    private StarGraphGryoSerializer(final Direction edgeDirectionToSerialize, final GraphFilter graphFilter) {
+        super(new StarGraphSerializer(edgeDirectionToSerialize, graphFilter));
+    }
+
     private StarGraphGryoSerializer(final Direction edgeDirectionToSerialize) {
-        this.edgeDirectionToSerialize = edgeDirectionToSerialize;
+        this(edgeDirectionToSerialize, new GraphFilter());
     }
 
     /**
@@ -71,100 +63,7 @@ public final class StarGraphGryoSerializer extends Serializer<StarGraph> {
     }
 
     public static StarGraphGryoSerializer withGraphFilter(final GraphFilter graphFilter) {
-        final StarGraphGryoSerializer serializer = new StarGraphGryoSerializer(Direction.BOTH);
-        serializer.graphFilter = graphFilter.clone();
+        final StarGraphGryoSerializer serializer = new StarGraphGryoSerializer(Direction.BOTH, graphFilter.clone());
         return serializer;
     }
-
-    @Override
-    public void write(final Kryo kryo, final Output output, final StarGraph starGraph) {
-        output.writeByte(VERSION_1);
-        kryo.writeObjectOrNull(output, starGraph.edgeProperties, HashMap.class);
-        kryo.writeObjectOrNull(output, starGraph.metaProperties, HashMap.class);
-        kryo.writeClassAndObject(output, starGraph.starVertex.id);
-        kryo.writeObject(output, starGraph.starVertex.label);
-        writeEdges(kryo, output, starGraph, Direction.IN);
-        writeEdges(kryo, output, starGraph, Direction.OUT);
-        kryo.writeObject(output, null != starGraph.starVertex.vertexProperties);
-        if (null != starGraph.starVertex.vertexProperties) {
-            kryo.writeObject(output, starGraph.starVertex.vertexProperties.size());
-            for (final Map.Entry<String, List<VertexProperty>> vertexProperties : starGraph.starVertex.vertexProperties.entrySet()) {
-                kryo.writeObject(output, vertexProperties.getKey());
-                kryo.writeObject(output, vertexProperties.getValue().size());
-                for (final VertexProperty vertexProperty : vertexProperties.getValue()) {
-                    kryo.writeClassAndObject(output, vertexProperty.id());
-                    kryo.writeClassAndObject(output, vertexProperty.value());
-                }
-            }
-        }
-    }
-
-    /**
-     * If the returned {@link StarGraph} is null, that means that the {@link GraphFilter} filtered the vertex.
-     */
-    @Override
-    public StarGraph read(final Kryo kryo, final Input input, final Class<StarGraph> aClass) {
-        final StarGraph starGraph = StarGraph.open();
-        input.readByte();  // version field ignored for now - for future use with backward compatibility
-        starGraph.edgeProperties = kryo.readObjectOrNull(input, HashMap.class);
-        starGraph.metaProperties = kryo.readObjectOrNull(input, HashMap.class);
-        starGraph.addVertex(T.id, kryo.readClassAndObject(input), T.label, kryo.readObject(input, String.class));
-        readEdges(kryo, input, starGraph, Direction.IN);
-        readEdges(kryo, input, starGraph, Direction.OUT);
-        if (kryo.readObject(input, Boolean.class)) {
-            final int numberOfUniqueKeys = kryo.readObject(input, Integer.class);
-            for (int i = 0; i < numberOfUniqueKeys; i++) {
-                final String vertexPropertyKey = kryo.readObject(input, String.class);
-                final int numberOfVertexPropertiesWithKey = kryo.readObject(input, Integer.class);
-                for (int j = 0; j < numberOfVertexPropertiesWithKey; j++) {
-                    final Object id = kryo.readClassAndObject(input);
-                    final Object value = kryo.readClassAndObject(input);
-                    starGraph.starVertex.property(VertexProperty.Cardinality.list, vertexPropertyKey, value, T.id, id);
-                }
-            }
-        }
-        return this.graphFilter.hasFilter() ? starGraph.applyGraphFilter(this.graphFilter).orElse(null) : starGraph;
-    }
-
-    private void writeEdges(final Kryo kryo, final Output output, final StarGraph starGraph, final Direction direction) {
-        // only write edges if there are some AND if the user requested them to be serialized AND if they match
-        // the direction being serialized by the format
-        final Map<String, List<Edge>> starEdges = direction.equals(Direction.OUT) ? starGraph.starVertex.outEdges : starGraph.starVertex.inEdges;
-        final boolean writeEdges = null != starEdges && edgeDirectionToSerialize != null
-                && (edgeDirectionToSerialize == direction || edgeDirectionToSerialize == Direction.BOTH);
-        kryo.writeObject(output, writeEdges);
-        if (writeEdges) {
-            kryo.writeObject(output, starEdges.size());
-            for (final Map.Entry<String, List<Edge>> edges : starEdges.entrySet()) {
-                kryo.writeObject(output, edges.getKey());
-                kryo.writeObject(output, edges.getValue().size());
-                for (final Edge edge : edges.getValue()) {
-                    kryo.writeClassAndObject(output, edge.id());
-                    kryo.writeClassAndObject(output, direction.equals(Direction.OUT) ? edge.inVertex().id() : edge.outVertex().id());
-                }
-            }
-        }
-    }
-
-    private void readEdges(final Kryo kryo, final Input input, final StarGraph starGraph, final Direction direction) {
-        if (kryo.readObject(input, Boolean.class)) {
-            final int numberOfUniqueLabels = kryo.readObject(input, Integer.class);
-            for (int i = 0; i < numberOfUniqueLabels; i++) {
-                final String edgeLabel = kryo.readObject(input, String.class);
-                final int numberOfEdgesWithLabel = kryo.readObject(input, Integer.class);
-                for (int j = 0; j < numberOfEdgesWithLabel; j++) {
-                    final Object edgeId = kryo.readClassAndObject(input);
-                    final Object adjacentVertexId = kryo.readClassAndObject(input);
-                    if (this.graphFilter.checkEdgeLegality(direction, edgeLabel).positive()) {
-                        if (direction.equals(Direction.OUT))
-                            starGraph.starVertex.addOutEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
-                        else
-                            starGraph.starVertex.addInEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
-                    } else if (null != starGraph.edgeProperties) {
-                        starGraph.edgeProperties.remove(edgeId);
-                    }
-                }
-            }
-        }
-    }
 }