You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/13 20:27:09 UTC
incubator-tinkerpop git commit: lots optimizations a fixed a major
bug in VertexWritable. GiraphComputeVertex uses very little memory now -- no
more 'dobuble copies.' Also,
everything is VertexWritable so it all chains together nicely. Added GryoPool
whi
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master a5147c1f9 -> db5a2d208
lots optimizations a fixed a major bug in VertexWritable. GiraphComputeVertex uses very little memory now -- no more 'dobuble copies.' Also, everything is VertexWritable so it all chains together nicely. Added GryoPool which supports GryoReader and GryoWriter pools.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/db5a2d20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/db5a2d20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/db5a2d20
Branch: refs/heads/master
Commit: db5a2d208cd3b2e6a53157b2f407c8505132809d
Parents: a5147c1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Mar 13 13:27:05 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Mar 13 13:27:05 2015 -0600
----------------------------------------------------------------------
.../gremlin/structure/io/gryo/GryoPool.java | 78 +++++++++++++++++
.../computer/giraph/GiraphComputeVertex.java | 89 ++------------------
.../computer/giraph/GiraphGraphComputer.java | 4 +-
.../computer/giraph/GiraphMessenger.java | 2 +-
.../computer/giraph/GiraphWorkerContext.java | 7 --
.../computer/giraph/io/GiraphVertexReader.java | 6 +-
.../computer/giraph/io/GiraphVertexWriter.java | 4 +-
.../hadoop/structure/io/VertexWritable.java | 54 ++++++++----
.../io/gryo/VertexStreamIteratorTest.java | 24 +++++-
9 files changed, 150 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
new file mode 100644
index 0000000..3078b07
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.io.gryo;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoPool {
+
+ private final ConcurrentLinkedQueue<GryoReader> gryoReaders;
+ private final ConcurrentLinkedQueue<GryoWriter> gryoWriters;
+
+ public GryoPool() {
+ this.gryoReaders = new ConcurrentLinkedQueue<>();
+ this.gryoWriters = new ConcurrentLinkedQueue<>();
+ }
+
+ public synchronized GryoReader getReader() {
+ final GryoReader reader = this.gryoReaders.poll();
+ return (null == reader) ? GryoReader.build().create() : reader;
+ }
+
+ public synchronized GryoWriter getWriter() {
+ final GryoWriter writer = this.gryoWriters.poll();
+ return (null == writer) ? GryoWriter.build().create() : writer;
+ }
+
+ public void addReader(final GryoReader gryoReader) {
+ this.gryoReaders.offer(gryoReader);
+ }
+
+ public void addWriter(final GryoWriter gryoWriter) {
+ this.gryoWriters.offer(gryoWriter);
+ }
+
+ public <A> A doWithReaderWriter(final BiFunction<GryoReader, GryoWriter, A> readerWriterBiFunction) {
+ final GryoReader gryoReader = this.getReader();
+ final GryoWriter gryoWriter = this.getWriter();
+ final A a = readerWriterBiFunction.apply(gryoReader, gryoWriter);
+ this.addReader(gryoReader);
+ this.addWriter(gryoWriter);
+ return a;
+ }
+
+ public <A> A doWithReader(final Function<GryoReader, A> readerFunction) {
+ final GryoReader gryoReader = this.getReader();
+ final A a = readerFunction.apply(gryoReader);
+ this.addReader(gryoReader);
+ return a;
+ }
+
+ public <A> A doWithWriter(final Function<GryoWriter, A> writerFunction) {
+ final GryoWriter gryoWriter = this.getWriter();
+ final A a = writerFunction.apply(gryoWriter);
+ this.addWriter(gryoWriter);
+ return a;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index e15eb39..a0252c9 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -21,54 +21,32 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerDataStrategy;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
import org.apache.tinkerpop.gremlin.structure.strategy.StrategyVertex;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
-import org.apache.tinkerpop.gremlin.structure.util.wrapped.WrappedVertex;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.function.Function;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWritable, ObjectWritable> implements WrappedVertex<TinkerVertex> {
+public final class GiraphComputeVertex extends Vertex<LongWritable, VertexWritable, NullWritable, ObjectWritable> {
//TODO: Dangerous that the underlying TinkerGraph Vertex can have edges written to it.
//TODO: LongWritable as the key is not general enough -- ObjectWritable causes problems though :|
- private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
- private TinkerVertex tinkerVertex;
private StrategyVertex wrappedVertex;
public GiraphComputeVertex() {
}
- public GiraphComputeVertex(final org.apache.tinkerpop.gremlin.structure.Vertex vertex, final GryoReader gryoReader, final GryoWriter gryoWriter) {
- this.tinkerVertex = this.generateTinkerVertexForm(vertex, gryoReader, gryoWriter);
- this.tinkerVertex.graph().variables().set(VERTEX_ID, this.tinkerVertex.id());
- this.initialize(new LongWritable(Long.valueOf(this.tinkerVertex.id().toString())), this.deflateTinkerVertex(gryoWriter), EmptyOutEdges.instance());
- }
-
- public TinkerVertex getBaseVertex() {
- return this.tinkerVertex;
+ public GiraphComputeVertex(final VertexWritable vertexWritable) {
+ final VertexWritable newWritable = new VertexWritable();
+ newWritable.set(vertexWritable.get());
+ this.initialize(new LongWritable(Long.valueOf(newWritable.get().id().toString())), newWritable, EmptyOutEdges.instance());
}
@Override
@@ -76,10 +54,8 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
final VertexProgram vertexProgram = ((GiraphWorkerContext) this.getWorkerContext()).getVertexProgram();
final GiraphMemory memory = ((GiraphWorkerContext) this.getWorkerContext()).getMemory();
final GiraphMessenger messenger = ((GiraphWorkerContext) this.getWorkerContext()).getMessenger(this, messages);
- ///
- if (null == this.tinkerVertex) inflateTinkerVertex();
if (null == this.wrappedVertex)
- this.wrappedVertex = ComputerDataStrategy.wrapVertex(this.tinkerVertex, vertexProgram);
+ this.wrappedVertex = ComputerDataStrategy.wrapVertex(this.getValue().get(), vertexProgram);
///////////
if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject())
vertexProgram.execute(this.wrappedVertex, messenger, memory); // TODO provide a wrapper around TinkerVertex for Edge and non-ComputeKeys manipulation
@@ -90,55 +66,4 @@ public final class GiraphComputeVertex extends Vertex<LongWritable, Text, NullWr
this.wrappedVertex.property(VertexProperty.Cardinality.single, Constants.MAP_MEMORY, mapMemory); // TODO: this is a "computer key"
}
}
-
- ///////////////////////////////////////////////
-
- private Text deflateTinkerVertex(final GryoWriter writer) {
- try {
- final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- writer.writeGraph(bos, this.tinkerVertex.graph());
- bos.flush();
- bos.close();
- return new Text(bos.toByteArray());
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- private void inflateTinkerVertex() {
- try {
- final ByteArrayInputStream bis = new ByteArrayInputStream(this.getValue().getBytes());
- final TinkerGraph tinkerGraph = TinkerGraph.open();
- ((GiraphWorkerContext) this.getWorkerContext()).getReader().readGraph(bis, tinkerGraph);
- bis.close();
- this.tinkerVertex = (TinkerVertex) tinkerGraph.vertices(tinkerGraph.variables().get(VERTEX_ID).get()).next();
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- private final TinkerVertex generateTinkerVertexForm(final org.apache.tinkerpop.gremlin.structure.Vertex otherVertex, final GryoReader gryoReader, final GryoWriter gryoWriter) {
- if (otherVertex instanceof TinkerVertex)
- return (TinkerVertex) otherVertex;
- else {
- try {
- final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- gryoWriter.writeVertex(bos, otherVertex, Direction.BOTH);
- bos.flush();
- bos.close();
- final ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- final TinkerGraph tinkerGraph = TinkerGraph.open();
- final TinkerVertex tinkerVertex;
- final Function<DetachedVertex, org.apache.tinkerpop.gremlin.structure.Vertex> vertexMaker = detachedVertex -> DetachedVertex.addTo(tinkerGraph, detachedVertex);
- final Function<DetachedEdge, Edge> edgeMaker = detachedEdge -> DetachedEdge.addTo(tinkerGraph, detachedEdge);
- try (InputStream in = new ByteArrayInputStream(bos.toByteArray())) {
- tinkerVertex = (TinkerVertex) gryoReader.readVertex(in, Direction.BOTH, vertexMaker, edgeMaker);
- }
- bis.close();
- return tinkerVertex;
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 52d606c..96a9bd9 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
@@ -40,6 +39,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io.GiraphVert
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MemoryMapReduce;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
@@ -86,7 +86,7 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
this.giraphConfiguration.setClass("giraph.vertexIdClass", LongWritable.class, LongWritable.class);
- this.giraphConfiguration.setClass("giraph.vertexValueClass", Text.class, Text.class);
+ this.giraphConfiguration.setClass("giraph.vertexValueClass", VertexWritable.class, VertexWritable.class);
this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
index 56d87b7..c34f9c6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
@@ -53,7 +53,7 @@ public class GiraphMessenger<M> implements Messenger<M> {
public void sendMessage(final MessageScope messageScope, final M message) {
if (messageScope instanceof MessageScope.Local) {
final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
- final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getBaseVertex());
+ final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getValue().get());
final Direction direction = GiraphMessenger.getOppositeDirection(incidentTraversal);
incidentTraversal.forEachRemaining(edge ->
this.giraphComputeVertex.sendMessage(
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
index d1a851c..91e7423 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
@@ -23,7 +23,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -33,7 +32,6 @@ public final class GiraphWorkerContext extends WorkerContext {
private VertexProgram<?> vertexProgram;
private GiraphMemory memory;
private GiraphMessenger messenger;
- private GryoReader gryoReader;
public GiraphWorkerContext() {
// Giraph ReflectionUtils requires this to be public at minimum
@@ -43,7 +41,6 @@ public final class GiraphWorkerContext extends WorkerContext {
this.vertexProgram = VertexProgram.createVertexProgram(ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration()));
this.memory = new GiraphMemory(this, this.vertexProgram);
this.messenger = new GiraphMessenger();
- this.gryoReader = GryoReader.build().create();
}
public void postApplication() {
@@ -70,8 +67,4 @@ public final class GiraphWorkerContext extends WorkerContext {
this.messenger.setCurrentVertex(giraphComputeVertex, messages);
return this.messenger;
}
-
- public GryoReader getReader() {
- return gryoReader;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
index 7be55ad..d240dad 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphComputeVertex;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
import java.io.IOException;
@@ -37,8 +35,6 @@ import java.io.IOException;
public class GiraphVertexReader extends VertexReader {
private RecordReader<NullWritable, VertexWritable> recordReader;
- private GryoReader gryoReader = GryoReader.build().create();
- private GryoWriter gryoWriter = GryoWriter.build().create();
public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
this.recordReader = recordReader;
@@ -56,7 +52,7 @@ public class GiraphVertexReader extends VertexReader {
@Override
public Vertex getCurrentVertex() throws IOException, InterruptedException {
- return new GiraphComputeVertex(this.recordReader.getCurrentValue().get(), this.gryoReader, this.gryoWriter);
+ return new GiraphComputeVertex(this.recordReader.getCurrentValue());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
index 78a1344..4b64b0c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
@@ -35,7 +35,6 @@ import java.io.IOException;
public class GiraphVertexWriter extends VertexWriter {
private final OutputFormat<NullWritable, VertexWritable> outputFormat;
private RecordWriter<NullWritable, VertexWritable> recordWriter;
- private final VertexWritable vertexWritable = new VertexWritable();
public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
this.outputFormat = outputFormat;
@@ -53,7 +52,6 @@ public class GiraphVertexWriter extends VertexWriter {
@Override
public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
- this.vertexWritable.set(((GiraphComputeVertex) vertex).getBaseVertex());
- this.recordWriter.write(NullWritable.get(), this.vertexWritable);
+ this.recordWriter.write(NullWritable.get(), ((GiraphComputeVertex) vertex).getValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index b8be3b3..a3fb2f7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -23,8 +23,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
@@ -42,8 +41,7 @@ import java.io.IOException;
public final class VertexWritable implements Writable {
private Vertex vertex;
- private final GryoReader GRYO_READER = GryoReader.build().create();
- private final GryoWriter GRYO_WRITER = GryoWriter.build().create();
+ private static final GryoPool GRYO_POOL = new GryoPool();
public VertexWritable() {
@@ -64,22 +62,46 @@ public final class VertexWritable implements Writable {
@Override
public void readFields(final DataInput input) throws IOException {
- this.vertex = null;
- final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[WritableUtils.readVInt(input)]);
- final Graph gLocal = TinkerGraph.open();
- this.vertex = GRYO_READER.readVertex(inputStream, Direction.BOTH,
- detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex),
- detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge));
-
+ try {
+ this.vertex = null;
+ this.vertex = GRYO_POOL.doWithReader(gryoReader -> {
+ try {
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
+ final Graph gLocal = TinkerGraph.open();
+ return gryoReader.readVertex(inputStream, Direction.BOTH,
+ detachedVertex -> DetachedVertex.addTo(gLocal, detachedVertex),
+ detachedEdge -> DetachedEdge.addTo(gLocal, detachedEdge));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ } catch (IllegalStateException e) {
+ if (e.getCause() instanceof IOException)
+ throw (IOException) e.getCause();
+ else
+ throw e;
+ }
}
@Override
public void write(final DataOutput output) throws IOException {
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- GRYO_WRITER.writeVertex(outputStream, this.vertex, Direction.BOTH);
- WritableUtils.writeVInt(output, outputStream.size());
- output.write(outputStream.toByteArray());
- outputStream.close();
+ try {
+ GRYO_POOL.doWithWriter(gryoWriter -> {
+ try {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ gryoWriter.writeVertex(outputStream, this.vertex, Direction.BOTH);
+ WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
+ return null;
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ } catch (IllegalStateException e) {
+ if (e.getCause() instanceof IOException)
+ throw (IOException) e.getCause();
+ else
+ throw e;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/db5a2d20/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
index 5dd9cb8..47266da 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/VertexStreamIteratorTest.java
@@ -18,9 +18,19 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo;
+import com.sun.xml.internal.messaging.saaj.util.ByteInputStream;
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.Ignore;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.FileInputStream;
/**
@@ -36,12 +46,22 @@ public class VertexStreamIteratorTest {
int counter = 0;
long time = System.currentTimeMillis();
while (iterator.hasNext()) {
- if (counter++ % 1000 == 0) {
+ if (++counter % 1000 == 0) {
System.out.println("Read vertices: " + counter + "[ms:" + (System.currentTimeMillis() - time) + "]");
System.out.println(iterator.getProgress() + " -- progress");
time = System.currentTimeMillis();
}
- iterator.next();
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ //System.out.println("HERE:" +IteratorUtils.count(vertex.get().edges(Direction.BOTH)));
+ final GiraphComputeVertex vertex = new GiraphComputeVertex(iterator.next());
+ //System.out.println(vertex + "!!!!");
+ final DataOutputStream output = new DataOutputStream(out);
+ vertex.getValue().write(output);
+ output.flush();
+ //System.out.println("!!!" + out.size());
+ final VertexWritable v = new VertexWritable();
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(out.toByteArray());
+ v.readFields(new DataInputStream(inputStream));
}
}
}