You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/30 00:07:16 UTC

[12/21] incubator-tinkerpop git commit: Refactoring GryoPool for spark usage.

Refactoring GryoPool for spark usage.

Removed getKryo() from GryoReader and GryoWriter.  Kryo instances can now be gotten directly from the GryoPool. Added configuration options to GryoMapper Builder to allow for lower-level settings to be passed to Kryo.  This allowed removal of the Consumer<Kryo> function passed to the GryoPool Builder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/d165ece9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/d165ece9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/d165ece9

Branch: refs/heads/thread-issue-tinkergraph
Commit: d165ece905ef5f34c2c00bbabd00ff74006e8dc5
Parents: c5827d2
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Sep 29 13:08:12 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Sep 29 13:08:12 2015 -0400

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoMapper.java   | 45 ++++++++--
 .../gremlin/structure/io/gryo/GryoPool.java     | 92 +++++++++++---------
 .../gremlin/structure/io/gryo/GryoReader.java   |  4 -
 .../gremlin/structure/io/gryo/GryoWriter.java   |  4 -
 .../structure/io/gryo/GryoMapperTest.java       | 21 +++++
 .../io/gryo/GryoDeserializationStream.groovy    |  2 +-
 .../io/gryo/GryoSerializationStream.java        |  2 +-
 .../spark/structure/io/gryo/GryoSerializer.java | 35 ++++----
 .../io/gryo/GryoSerializerInstance.java         | 10 +--
 9 files changed, 136 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 619d657..cba2497 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -122,16 +122,22 @@ public final class GryoMapper implements Mapper<Kryo> {
     public static final byte[] GIO = "gio".getBytes();
     public static final byte[] HEADER = Arrays.copyOf(GIO, 16);
     private final List<Triplet<Class, Function<Kryo, Serializer>, Integer>> serializationList;
+    private boolean registrationRequired;
+    private boolean referenceTracking;
 
-    private GryoMapper(final List<Triplet<Class, Function<Kryo, Serializer>, Integer>> serializationList) {
-        this.serializationList = serializationList;
+    private GryoMapper(final Builder builder) {
+        this.serializationList = builder.serializationList;
+        this.registrationRequired = builder.registrationRequired;
+        this.referenceTracking = builder.referenceTracking;
     }
 
     @Override
     public Kryo createMapper() {
         final Kryo kryo = new Kryo(new GryoClassResolver(), new MapReferenceResolver(), new DefaultStreamFactory());
         kryo.addDefaultSerializer(Map.Entry.class, new EntrySerializer());
-        kryo.setRegistrationRequired(true);
+        kryo.setRegistrationRequired(registrationRequired);
+        kryo.setReferences(referenceTracking);
+
         serializationList.forEach(p -> {
             final Function<Kryo, Serializer> serializer = p.getValue1();
             if (null == serializer)
@@ -251,13 +257,16 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Pair.class, kryo -> new PairSerializer(), 88)); // ***LAST ID**
         }};
 
-        private List<IoRegistry> registries = new ArrayList<>();
+        private final List<IoRegistry> registries = new ArrayList<>();
 
         /**
          * Starts numbering classes for Gryo serialization at 65536 to leave room for future usage by TinkerPop.
          */
         private final AtomicInteger currentSerializationId = new AtomicInteger(65536);
 
+        private boolean registrationRequired = true;
+        private boolean referenceTracking = true;
+
         private Builder() {
         }
 
@@ -299,6 +308,32 @@ public final class GryoMapper implements Mapper<Kryo> {
         }
 
         /**
+         * When set to {@code true}, all classes serialized by the {@code Kryo} instances created from this
+         * {@link GryoMapper} must have their classes known up front and registered appropriately through this
+         * builder.  By default this value is {@code true}.  This approach is more efficient than setting the
+         * value to {@code false}.
+         *
+         * @param registrationRequired set to {@code true} if the classes should be registered up front or
+         * {@code false} otherwise
+         */
+        public Builder registrationRequired(final boolean registrationRequired) {
+            this.registrationRequired = registrationRequired;
+            return this;
+        }
+
+        /**
+         * By default, each appearance of an object in the graph after the first is stored as an integer ordinal.
+         * This allows multiple references to the same object and cyclic graphs to be serialized. This has a small
+         * amount of overhead and can be disabled to save space if it is not needed.
+         *
+         * @param referenceTracking set to {@code true} to enable and {@code false} otherwise
+         */
+        public Builder referenceTracking(final boolean referenceTracking) {
+            this.referenceTracking = referenceTracking;
+            return this;
+        }
+
+        /**
          * Creates a {@code GryoMapper}.
          */
         public GryoMapper create() {
@@ -321,7 +356,7 @@ public final class GryoMapper implements Mapper<Kryo> {
                 });
             });
 
-            return new GryoMapper(serializationList);
+            return new GryoMapper(this);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 0b6faab..e7bf636 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -45,6 +45,7 @@ public final class GryoPool {
 
     private Queue<GryoReader> gryoReaders;
     private Queue<GryoWriter> gryoWriters;
+    private Queue<Kryo> kryos;
     private GryoMapper mapper;
 
     public static GryoPool.Builder build() {
@@ -57,20 +58,13 @@ public final class GryoPool {
     private GryoPool() {
     }
 
-    private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
-        this.mapper = gryoMapper;
-        if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
-            this.gryoReaders = new LinkedBlockingQueue<>(poolSize);
-            for (int i = 0; i < poolSize; i++) {
-                this.gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
-            }
-        }
-        if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
-            this.gryoWriters = new LinkedBlockingQueue<>(poolSize);
-            for (int i = 0; i < poolSize; i++) {
-                this.gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
-            }
-        }
+    public GryoMapper getMapper() {
+        return mapper;
+    }
+
+    public Kryo takeKryo() {
+        final Kryo kryo = kryos.poll();
+        return null == kryo ? mapper.createMapper() : kryo;
     }
 
     public GryoReader takeReader() {
@@ -83,25 +77,63 @@ public final class GryoPool {
         return null == writer ? GryoWriter.build().mapper(mapper).create() : writer;
     }
 
+    public void offerKryo(final Kryo kryo) {
+        kryos.offer(kryo);
+    }
+
     public void offerReader(final GryoReader gryoReader) {
-        this.gryoReaders.offer(gryoReader);
+        gryoReaders.offer(gryoReader);
     }
 
     public void offerWriter(final GryoWriter gryoWriter) {
-        this.gryoWriters.offer(gryoWriter);
+        gryoWriters.offer(gryoWriter);
+    }
+
+    public <A> A readWithKryo(final Function<Kryo, A> kryoFunction) {
+        final Kryo kryo = takeKryo();
+        final A a = kryoFunction.apply(kryo);
+        offerKryo(kryo);
+        return a;
+    }
+
+    public void writeWithKryo(final Consumer<Kryo> kryoConsumer) {
+        final Kryo kryo = takeKryo();
+        kryoConsumer.accept(kryo);
+        offerKryo(kryo);
     }
 
     public <A> A doWithReader(final Function<GryoReader, A> readerFunction) {
-        final GryoReader gryoReader = this.takeReader();
+        final GryoReader gryoReader = takeReader();
         final A a = readerFunction.apply(gryoReader);
-        this.offerReader(gryoReader);
+        offerReader(gryoReader);
         return a;
     }
 
     public void doWithWriter(final Consumer<GryoWriter> writerFunction) {
-        final GryoWriter gryoWriter = this.takeWriter();
+        final GryoWriter gryoWriter = takeWriter();
         writerFunction.accept(gryoWriter);
-        this.offerWriter(gryoWriter);
+        offerWriter(gryoWriter);
+    }
+
+    private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
+        this.mapper = gryoMapper;
+        if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
+            gryoReaders = new LinkedBlockingQueue<>(poolSize);
+            for (int i = 0; i < poolSize; i++) {
+                gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
+            }
+        }
+        if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
+            gryoWriters = new LinkedBlockingQueue<>(poolSize);
+            for (int i = 0; i < poolSize; i++) {
+                gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
+            }
+        }
+
+        kryos = new LinkedBlockingQueue<>(poolSize);
+        for (int i = 0; i < poolSize; i++) {
+            kryos.add(gryoMapper.createMapper());
+        }
     }
 
     ////
@@ -112,7 +144,6 @@ public final class GryoPool {
         private List<IoRegistry> ioRegistries = new ArrayList<>();
         private Type type = Type.READER_WRITER;
         private Consumer<GryoMapper.Builder> gryoMapperConsumer = null;
-        private Consumer<Kryo> kryoConsumer = null;
 
         /**
          * The {@code IoRegistry} class names to use for the {@code GryoPool}
@@ -170,17 +201,6 @@ public final class GryoPool {
         }
 
         /**
-         * A consumer to update all the {@link Kryo} instances for all {@code GryoReader} and {@code GryoWriter} instances.
-         *
-         * @param kryoConsumer the consumer
-         * @return the updated builder
-         */
-        public Builder initializeKryo(final Consumer<Kryo> kryoConsumer) {
-            this.kryoConsumer = kryoConsumer;
-            return this;
-        }
-
-        /**
          * Create the {@code GryoPool} from this builder.
          *
          * @return the new pool
@@ -193,14 +213,6 @@ public final class GryoPool {
             if (null != this.gryoMapperConsumer)
                 this.gryoMapperConsumer.accept(mapper);
             gryoPool.createPool(this.poolSize, this.type, mapper.create());
-            if (null != this.kryoConsumer) {
-                for (final GryoReader reader : gryoPool.gryoReaders) {
-                    kryoConsumer.accept(reader.getKryo());
-                }
-                for (final GryoWriter writer : gryoPool.gryoWriters) {
-                    kryoConsumer.accept(writer.getKryo());
-                }
-            }
             return gryoPool;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
index ebc0ebc..0080d68 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
@@ -67,10 +67,6 @@ public final class GryoReader implements GraphReader {
         this.batchSize = batchSize;
     }
 
-    public Kryo getKryo() {
-        return this.kryo;
-    }
-
     /**
      * Read data into a {@link Graph} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
index 8ca8e51..d98b8c2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
@@ -51,10 +51,6 @@ public final class GryoWriter implements GraphWriter {
         this.kryo = gryoMapper.createMapper();
     }
 
-    public Kryo getKryo() {
-        return this.kryo;
-    }
-
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
index b500495..9bdec55 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
@@ -47,6 +47,27 @@ public class GryoMapperTest {
     }
 
     @Test
+    public void shouldSerializeWithoutRegistration() throws Exception {
+        final GryoMapper mapper = GryoMapper.build().registrationRequired(false).create();
+        final Kryo kryo = mapper.createMapper();
+        try (final OutputStream stream = new ByteArrayOutputStream()) {
+            final Output out = new Output(stream);
+            final IoX x = new IoX("x");
+            final IoY y = new IoY(100, 200);
+            kryo.writeClassAndObject(out, x);
+            kryo.writeClassAndObject(out, y);
+
+            try (final InputStream inputStream = new ByteArrayInputStream(out.toBytes())) {
+                final Input input = new Input(inputStream);
+                final IoX readX = (IoX) kryo.readClassAndObject(input);
+                final IoY readY = (IoY) kryo.readClassAndObject(input);
+                assertEquals(x, readX);
+                assertEquals(y, readY);
+            }
+        }
+    }
+
+    @Test
     public void shouldRegisterMultipleIoRegistryToSerialize() throws Exception {
         final GryoMapper mapper = GryoMapper.build()
                 .addRegistry(IoXIoRegistry.InstanceBased.getInstance())

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
index 4b508ea..d01a314 100644
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
@@ -41,7 +41,7 @@ public final class GryoDeserializationStream extends DeserializationStream {
     @Override
     public <T> T readObject(final ClassTag<T> classTag) {
         try {
-            return this.gryoSerializer.getGryoPool().doWithReader { reader -> (T) reader.getKryo().readClassAndObject(this.input) }
+            return this.gryoSerializer.getGryoPool().readWithKryo { kryo -> (T) kryo.readClassAndObject(this.input) }
         } catch (final Throwable e) {
             if (e instanceof KryoException) {
                 final KryoException kryoException = (KryoException) e;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
index 33e809d..608dcca 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
@@ -40,7 +40,7 @@ public final class GryoSerializationStream extends SerializationStream {
 
     @Override
     public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
-        this.gryoSerializer.getGryoPool().doWithWriter(writer -> writer.getKryo().writeClassAndObject(this.output, t));
+        this.gryoSerializer.getGryoPool().writeWithKryo(kryo -> kryo.writeClassAndObject(this.output, t));
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 7c409ef..f0b1147 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -77,29 +77,26 @@ public final class GryoSerializer extends Serializer {
                 ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {
-                        builder.
-                                addCustom(SerializableWritable.class, new JavaSerializer()).
-                                addCustom(Tuple2.class, new JavaSerializer()).
-                                addCustom(CompressedMapStatus.class, new JavaSerializer()).
-                                addCustom(HttpBroadcast.class, new JavaSerializer()).
-                                addCustom(PythonBroadcast.class, new JavaSerializer()).
-                                addCustom(BoxedUnit.class, new JavaSerializer()).
-                                addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
-                                addCustom(MessagePayload.class, new JavaSerializer()).
-                                addCustom(ViewIncomingPayload.class, new JavaSerializer()).
-                                addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
-                                addCustom(ViewPayload.class, new JavaSerializer()).
-                                addCustom(SerializableConfiguration.class, new JavaSerializer()).
-                                addCustom(VertexWritable.class, new JavaSerializer()).
-                                addCustom(ObjectWritable.class, new JavaSerializer());
+                        builder.addCustom(SerializableWritable.class, new JavaSerializer())
+                            .addCustom(Tuple2.class, new JavaSerializer())
+                            .addCustom(CompressedMapStatus.class, new JavaSerializer())
+                            .addCustom(HttpBroadcast.class, new JavaSerializer())
+                            .addCustom(PythonBroadcast.class, new JavaSerializer())
+                            .addCustom(BoxedUnit.class, new JavaSerializer())
+                            .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer())
+                            .addCustom(MessagePayload.class, new JavaSerializer())
+                            .addCustom(ViewIncomingPayload.class, new JavaSerializer())
+                            .addCustom(ViewOutgoingPayload.class, new JavaSerializer())
+                            .addCustom(ViewPayload.class, new JavaSerializer())
+                            .addCustom(SerializableConfiguration.class, new JavaSerializer())
+                            .addCustom(VertexWritable.class, new JavaSerializer())
+                            .addCustom(ObjectWritable.class, new JavaSerializer())
+                            .referenceTracking(referenceTracking)
+                            .registrationRequired(registrationRequired);
                                 // add these as we find ClassNotFoundExceptions
                     } catch (final ClassNotFoundException e) {
                         throw new IllegalStateException(e);
                     }
-                }).
-                initializeKryo(kryo -> {
-                    kryo.setRegistrationRequired(registrationRequired);
-                    kryo.setReferences(referenceTracking);
                 }).create();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
index c0571c0..3f93c24 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
@@ -48,22 +48,22 @@ public final class GryoSerializerInstance extends SerializerInstance {
 
     @Override
     public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
-        this.gryoSerializer.getGryoPool().doWithWriter(writer -> writer.getKryo().writeClassAndObject(this.output, t));
+        this.gryoSerializer.getGryoPool().writeWithKryo(kryo -> kryo.writeClassAndObject(this.output, t));
         return ByteBuffer.wrap(this.output.getBuffer());
     }
 
     @Override
     public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
         this.input.setBuffer(byteBuffer.array());
-        return this.gryoSerializer.getGryoPool().doWithReader(reader -> (T) reader.getKryo().readClassAndObject(this.input));
+        return this.gryoSerializer.getGryoPool().readWithKryo(kryo -> (T) kryo.readClassAndObject(this.input));
     }
 
     @Override
     public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
         this.input.setBuffer(byteBuffer.array());
-        return this.gryoSerializer.getGryoPool().doWithReader(reader -> {
-            reader.getKryo().setClassLoader(classLoader);
-            return (T) reader.getKryo().readClassAndObject(this.input);
+        return this.gryoSerializer.getGryoPool().readWithKryo(kryo -> {
+            kryo.setClassLoader(classLoader);
+            return (T) kryo.readClassAndObject(this.input);
         });
     }