You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/13 20:27:09 UTC

incubator-tinkerpop git commit: lots optimizations a fixed a major bug in VertexWritable. GiraphComputeVertex uses very little memory now -- no more 'dobuble copies.' Also, everything is VertexWritable so it all chains together nicely. Added GryoPool whi

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master a5147c1f9 -> db5a2d208


lots optimizations a fixed a major bug in VertexWritable. GiraphComputeVertex uses very little memory now -- no more 'dobuble copies.' Also, everything is VertexWritable so it all chains together nicely. Added GryoPool which supports GryoReader and GryoWriter pools.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/db5a2d20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/db5a2d20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/db5a2d20

Branch: refs/heads/master
Commit: db5a2d208cd3b2e6a53157b2f407c8505132809d
Parents: a5147c1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Mar 13 13:27:05 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Mar 13 13:27:05 2015 -0600

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoPool.java     | 78 +++++++++++++++++
 .../computer/giraph/GiraphComputeVertex.java    | 89 ++------------------
 .../computer/giraph/GiraphGraphComputer.java    |  4 +-
 .../computer/giraph/GiraphMessenger.java        |  2 +-
 .../computer/giraph/GiraphWorkerContext.java    |  7 --
 .../computer/giraph/io/GiraphVertexReader.java  |  6 +-
 .../computer/giraph/io/GiraphVertexWriter.java  |  4 +-
 .../hadoop/structure/io/VertexWritable.java     | 54 ++++++++----
 .../io/gryo/VertexStreamIteratorTest.java       | 24 +++++-
 9 files changed, 150 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
new file mode 100644
index 0000000..3078b07
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.gryo;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoPool {
+
+    private final ConcurrentLinkedQueue<GryoReader> gryoReaders;
+    private final ConcurrentLinkedQueue<GryoWriter> gryoWriters;
+
+    public GryoPool() {
+        this.gryoReaders = new ConcurrentLinkedQueue<>();
+        this.gryoWriters = new ConcurrentLinkedQueue<>();
+    }
+
+    public synchronized GryoReader getReader() {
+        final GryoReader reader = this.gryoReaders.poll();
+        return (null == reader) ? GryoReader.build().create() : reader;
+    }
+
+    public synchronized GryoWriter getWriter() {
+        final GryoWriter writer = this.gryoWriters.poll();
+        return (null == writer) ? GryoWriter.build().create() : writer;
+    }
+
+    public void addReader(final GryoReader gryoReader) {
+        this.gryoReaders.offer(gryoReader);
+    }
+
+    public void addWriter(final GryoWriter gryoWriter) {
+        this.gryoWriters.offer(gryoWriter);
+    }
+
+    public <A> A doWithReaderWriter(final BiFunction<GryoReader, GryoWriter, A> readerWriterBiFunction) {
+        final GryoReader gryoReader = this.getReader();
+        final GryoWriter gryoWriter = this.getWriter();
+        final A a = readerWriterBiFunction.apply(gryoReader, gryoWriter);
+        this.addReader(gryoReader);
+        this.addWriter(gryoWriter);
+        return a;
+    }
+
+    public <A> A doWithReader(final Function<GryoReader, A> readerFunction) {
+        final GryoReader gryoReader = this.getReader();
+        final A a = readerFunction.apply(gryoReader);
+        this.addReader(gryoReader);
+        return a;
+    }
+
+    public <A> A doWithWriter(final Function<GryoWriter, A> writerFunction) {
+        final GryoWriter gryoWriter = this.getWriter();
+        final A a = writerFunction.apply(gryoWriter);
+        this.addWriter(gryoWriter);
+        return a;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index e15eb39..a0252c9 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -21,54 +21,32 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerDataStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
 import org.apache.tinkerpop.gremlin.structure.strategy.StrategyVertex;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
-import org.apache.tinkerpop.gremlin.structure.util.wrapped.WrappedVertex;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWritable, ObjectWritable> implements WrappedVertex<TinkerVertex> {
+public final class GiraphComputeVertex extends Vertex<LongWritable, VertexWritable, NullWritable, ObjectWritable> {
 
     //TODO: Dangerous that the underlying TinkerGraph Vertex can have edges written to it.
     //TODO: LongWritable as the key is not general enough -- ObjectWritable causes problems though :|
 
-    private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
-    private TinkerVertex tinkerVertex;
     private StrategyVertex wrappedVertex;
 
     public GiraphComputeVertex() {
     }
 
-    public GiraphComputeVertex(final org.apache.tinkerpop.gremlin.structure.Vertex vertex, final GryoReader gryoReader, final GryoWriter gryoWriter) {
-        this.tinkerVertex = this.generateTinkerVertexForm(vertex, gryoReader, gryoWriter);
-        this.tinkerVertex.graph().variables().set(VERTEX_ID, this.tinkerVertex.id());
-        this.initialize(new LongWritable(Long.valueOf(this.tinkerVertex.id().toString())), this.deflateTinkerVertex(gryoWriter), EmptyOutEdges.instance());
-    }
-
-    public TinkerVertex getBaseVertex() {
-        return this.tinkerVertex;
+    public GiraphComputeVertex(final VertexWritable vertexWritable) {
+        final VertexWritable newWritable = new VertexWritable();
+        newWritable.set(vertexWritable.get());
+        this.initialize(new LongWritable(Long.valueOf(newWritable.get().id().toString())), newWritable, EmptyOutEdges.instance());
     }
 
     @Override
@@ -76,10 +54,8 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
         final VertexProgram vertexProgram = ((GiraphWorkerContext) this.getWorkerContext()).getVertexProgram();
         final GiraphMemory memory = ((GiraphWorkerContext) this.getWorkerContext()).getMemory();
         final GiraphMessenger messenger = ((GiraphWorkerContext) this.getWorkerContext()).getMessenger(this, messages);
-        ///
-        if (null == this.tinkerVertex) inflateTinkerVertex();
         if (null == this.wrappedVertex)
-            this.wrappedVertex = ComputerDataStrategy.wrapVertex(this.tinkerVertex, vertexProgram);
+            this.wrappedVertex = ComputerDataStrategy.wrapVertex(this.getValue().get(), vertexProgram);
         ///////////
         if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject())
             vertexProgram.execute(this.wrappedVertex, messenger, memory);  // TODO provide a wrapper around TinkerVertex for Edge and non-ComputeKeys manipulation
@@ -90,55 +66,4 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
             this.wrappedVertex.property(VertexProperty.Cardinality.single, Constants.MAP_MEMORY, mapMemory);  // TODO: this is a "computer key"
         }
     }
-
-    ///////////////////////////////////////////////
-
-    private Text deflateTinkerVertex(final GryoWriter writer) {
-        try {
-            final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            writer.writeGraph(bos, this.tinkerVertex.graph());
-            bos.flush();
-            bos.close();
-            return new Text(bos.toByteArray());
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    private void inflateTinkerVertex() {
-        try {
-            final ByteArrayInputStream bis = new ByteArrayInputStream(this.getValue().getBytes());
-            final TinkerGraph tinkerGraph = TinkerGraph.open();
-            ((GiraphWorkerContext) this.getWorkerContext()).getReader().readGraph(bis, tinkerGraph);
-            bis.close();
-            this.tinkerVertex = (TinkerVertex) tinkerGraph.vertices(tinkerGraph.variables().get(VERTEX_ID).get()).next();
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    private final TinkerVertex generateTinkerVertexForm(final org.apache.tinkerpop.gremlin.structure.Vertex otherVertex, final GryoReader gryoReader, final GryoWriter gryoWriter) {
-        if (otherVertex instanceof TinkerVertex)
-            return (TinkerVertex) otherVertex;
-        else {
-            try {
-                final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                gryoWriter.writeVertex(bos, otherVertex, Direction.BOTH);
-                bos.flush();
-                bos.close();
-                final ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-                final TinkerGraph tinkerGraph = TinkerGraph.open();
-                final TinkerVertex tinkerVertex;
-                final Function<DetachedVertex, org.apache.tinkerpop.gremlin.structure.Vertex> vertexMaker = detachedVertex -> DetachedVertex.addTo(tinkerGraph, detachedVertex);
-                final Function<DetachedEdge, Edge> edgeMaker = detachedEdge -> DetachedEdge.addTo(tinkerGraph, detachedEdge);
-                try (InputStream in = new ByteArrayInputStream(bos.toByteArray())) {
-                    tinkerVertex = (TinkerVertex) gryoReader.readVertex(in, Direction.BOTH, vertexMaker, edgeMaker);
-                }
-                bis.close();
-                return tinkerVertex;
-            } catch (final Exception e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 52d606c..96a9bd9 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -40,6 +39,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io.GiraphVert
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MemoryMapReduce;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
@@ -86,7 +86,7 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
         this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
         this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
         this.giraphConfiguration.setClass("giraph.vertexIdClass", LongWritable.class, LongWritable.class);
-        this.giraphConfiguration.setClass("giraph.vertexValueClass", Text.class, Text.class);
+        this.giraphConfiguration.setClass("giraph.vertexValueClass", VertexWritable.class, VertexWritable.class);
         this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
         this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
index 56d87b7..c34f9c6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
@@ -53,7 +53,7 @@ public class GiraphMessenger<M> implements Messenger<M> {
     public void sendMessage(final MessageScope messageScope, final M message) {
         if (messageScope instanceof MessageScope.Local) {
             final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
-            final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getBaseVertex());
+            final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getValue().get());
             final Direction direction = GiraphMessenger.getOppositeDirection(incidentTraversal);
             incidentTraversal.forEachRemaining(edge ->
                     this.giraphComputeVertex.sendMessage(

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
index d1a851c..91e7423 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
@@ -23,7 +23,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -33,7 +32,6 @@ public final class GiraphWorkerContext extends WorkerContext {
     private VertexProgram<?> vertexProgram;
     private GiraphMemory memory;
     private GiraphMessenger messenger;
-    private GryoReader gryoReader;
 
     public GiraphWorkerContext() {
         // Giraph ReflectionUtils requires this to be public at minimum
@@ -43,7 +41,6 @@ public final class GiraphWorkerContext extends WorkerContext {
         this.vertexProgram = VertexProgram.createVertexProgram(ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration()));
         this.memory = new GiraphMemory(this, this.vertexProgram);
         this.messenger = new GiraphMessenger();
-        this.gryoReader = GryoReader.build().create();
     }
 
     public void postApplication() {
@@ -70,8 +67,4 @@ public final class GiraphWorkerContext extends WorkerContext {
         this.messenger.setCurrentVertex(giraphComputeVertex, messages);
         return this.messenger;
     }
-
-    public GryoReader getReader() {
-        return gryoReader;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
index 7be55ad..d240dad 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphComputeVertex;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
 
 import java.io.IOException;
 
@@ -37,8 +35,6 @@ import java.io.IOException;
 public class GiraphVertexReader extends VertexReader {
 
     private RecordReader<NullWritable, VertexWritable> recordReader;
-    private GryoReader gryoReader = GryoReader.build().create();
-    private GryoWriter gryoWriter = GryoWriter.build().create();
 
     public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
         this.recordReader = recordReader;
@@ -56,7 +52,7 @@ public class GiraphVertexReader extends VertexReader {
 
     @Override
     public Vertex getCurrentVertex() throws IOException, InterruptedException {
-        return new GiraphComputeVertex(this.recordReader.getCurrentValue().get(), this.gryoReader, this.gryoWriter);
+        return new GiraphComputeVertex(this.recordReader.getCurrentValue());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
index 78a1344..4b64b0c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
@@ -35,7 +35,6 @@ import java.io.IOException;
 public class GiraphVertexWriter extends VertexWriter {
     private final OutputFormat<NullWritable, VertexWritable> outputFormat;
     private RecordWriter<NullWritable, VertexWritable> recordWriter;
-    private final VertexWritable vertexWritable = new VertexWritable();
 
     public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
         this.outputFormat = outputFormat;
@@ -53,7 +52,6 @@ public class GiraphVertexWriter extends VertexWriter {
 
     @Override
     public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
-        this.vertexWritable.set(((GiraphComputeVertex) vertex).getBaseVertex());
-        this.recordWriter.write(NullWritable.get(), this.vertexWritable);
+        this.recordWriter.write(NullWritable.get(), ((GiraphComputeVertex) vertex).getValue());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index b8be3b3..a3fb2f7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -23,8 +23,7 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
@@ -42,8 +41,7 @@ import java.io.IOException;
 public final class VertexWritable implements Writable {
 
     private Vertex vertex;
-    private final GryoReader GRYO_READER = GryoReader.build().create();
-    private final GryoWriter GRYO_WRITER = GryoWriter.build().create();
+    private static final GryoPool GRYO_POOL = new GryoPool();
 
     public VertexWritable() {
 
@@ -64,22 +62,46 @@ public final class VertexWritable implements Writable {
 
     @Override
     public void readFields(final DataInput input) throws IOException {
-        this.vertex = null;
-        final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[WritableUtils.readVInt(input)]);
-        final Graph gLocal = TinkerGraph.open();
-        this.vertex = GRYO_READER.readVertex(inputStream, Direction.BOTH,
-                detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex),
-                detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge));
-
+        try {
+            this.vertex = null;
+            this.vertex = GRYO_POOL.doWithReader(gryoReader -> {
+                try {
+                    final ByteArrayInputStream inputStream = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
+                    final Graph gLocal = TinkerGraph.open();
+                    return gryoReader.readVertex(inputStream, Direction.BOTH,
+                            detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex),
+                            detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge));
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e);
+                }
+            });
+        } catch (IllegalStateException e) {
+            if (e.getCause() instanceof IOException)
+                throw (IOException) e.getCause();
+            else
+                throw e;
+        }
     }
 
     @Override
     public void write(final DataOutput output) throws IOException {
-        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        GRYO_WRITER.writeVertex(outputStream, this.vertex, Direction.BOTH);
-        WritableUtils.writeVInt(output, outputStream.size());
-        output.write(outputStream.toByteArray());
-        outputStream.close();
+        try {
+            GRYO_POOL.doWithWriter(gryoWriter -> {
+                try {
+                    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+                    gryoWriter.writeVertex(outputStream, this.vertex, Direction.BOTH);
+                    WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
+                    return null;
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e);
+                }
+            });
+        } catch (IllegalStateException e) {
+            if (e.getCause() instanceof IOException)
+                throw (IOException) e.getCause();
+            else
+                throw e;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
index 5dd9cb8..47266da 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
@@ -18,9 +18,19 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo;
 
+import com.sun.xml.internal.messaging.saaj.util.ByteInputStream;
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.FileInputStream;
 
 /**
@@ -36,12 +46,22 @@ public class VertexStreamIteratorTest {
         int counter = 0;
         long time = System.currentTimeMillis();
         while (iterator.hasNext()) {
-            if (counter++ % 1000 == 0) {
+            if (++counter % 1000 == 0) {
                 System.out.println("Read vertices: " + counter + "[ms:" + (System.currentTimeMillis() - time) + "]");
                 System.out.println(iterator.getProgress() + " -- progress");
                 time = System.currentTimeMillis();
             }
-            iterator.next();
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            //System.out.println("HERE:" +IteratorUtils.count(vertex.get().edges(Direction.BOTH)));
+            final GiraphComputeVertex vertex = new GiraphComputeVertex(iterator.next());
+            //System.out.println(vertex + "!!!!");
+            final DataOutputStream output = new DataOutputStream(out);
+            vertex.getValue().write(output);
+            output.flush();
+            //System.out.println("!!!" + out.size());
+            final VertexWritable v = new VertexWritable();
+            final ByteArrayInputStream inputStream = new ByteArrayInputStream(out.toByteArray());
+            v.readFields(new DataInputStream(inputStream));
         }
     }
 }