You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/30 00:07:07 UTC
[03/21] incubator-tinkerpop git commit: GryoPool constructor added,
GryoReader/Writer now have getKryo() public methods. GryoSerializer
now exists as a Spark serializer which will connect with the Graph and get
its registered serializers. Updated conf/pr
GryoPool constructor added, GryoReader/Writer now have getKryo() public methods. GryoSerializer now exists as a Spark serializer which will connect with the Graph and get its registered serializers. Updated conf/properties.files with GryoSerializer as the new default serializer instead of Sparks KryoSerializer. Uncommented GroupCountTest test which failed due to serialization issue in 3.0.x.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fde13ed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fde13ed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fde13ed5
Branch: refs/heads/thread-issue-tinkergraph
Commit: fde13ed5d9db8c18bd9e0d8eb4541755aa181697
Parents: a8b1439
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 13:03:11 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 13:03:11 2015 -0600
----------------------------------------------------------------------
.../gremlin/structure/io/gryo/GryoPool.java | 36 +++++-
.../gremlin/structure/io/gryo/GryoReader.java | 66 +++++-----
.../gremlin/structure/io/gryo/GryoWriter.java | 4 +
.../step/sideEffect/GroupCountTest.java | 1 -
hadoop-gremlin/conf/hadoop-gryo.properties | 2 +-
hadoop-gremlin/conf/hadoop-script.properties | 2 +-
.../io/gryo/GryoDeserializationStream.groovy | 63 ++++++++++
.../spark/process/computer/io/InputRDD.java | 2 +-
.../io/gryo/GryoDeserializationStream.java | 55 --------
.../io/gryo/GryoSerializationStream.java | 69 ----------
.../computer/io/gryo/GryoSerializer.java | 62 ---------
.../io/gryo/GryoSerializerInstance.java | 77 ------------
.../io/gryo/GryoSerializationStream.java | 59 +++++++++
.../spark/structure/io/gryo/GryoSerializer.java | 125 +++++++++++++++++++
.../io/gryo/GryoSerializerInstance.java | 87 +++++++++++++
.../computer/HadoopSparkGraphProvider.java | 2 +-
16 files changed, 407 insertions(+), 305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 426997e..275009f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -47,6 +48,22 @@ public final class GryoPool {
private Queue<GryoWriter> gryoWriters;
private final GryoMapper mapper;
+ public GryoPool(final Configuration conf, final Consumer<GryoMapper.Builder> builderConsumer, final Consumer<Kryo> kryoConsumer) {
+ final GryoMapper.Builder mapperBuilder = GryoMapper.build();
+ tryCreateIoRegistry(conf.getList(CONFIG_IO_REGISTRY, Collections.<IoRegistry>emptyList())).forEach(mapperBuilder::addRegistry);
+ builderConsumer.accept(mapperBuilder);
+ // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
+ this.mapper = mapperBuilder.create();
+ this.createPool(conf.getInt(CONFIG_IO_GRYO_POOL_SIZE, 256), Type.READER_WRITER, this.mapper);
+ for (final GryoReader reader : this.gryoReaders) {
+ kryoConsumer.accept(reader.getKryo());
+ }
+ for (final GryoWriter writer : this.gryoWriters) {
+ kryoConsumer.accept(writer.getKryo());
+ }
+
+ }
+
/**
* Create a pool of readers and writers from a {@code Configuration} object. There are two configuration keys
* expected: "gremlin.io.registry" which defines comma separated list of the fully qualified class names of
@@ -73,30 +90,37 @@ public final class GryoPool {
* Create a pool of a readers, writers or both of the specified size with an optional {@link IoRegistry} object
* which would allow custom serializers to be registered to the pool.
*
- * @param poolSize initial size of the pool.
- * @param type the type of pool.
+ * @param poolSize initial size of the pool.
+ * @param type the type of pool.
* @param registries a list of registries to assign to each {@link GryoReader} and {@link GryoWriter} instances.
*/
public GryoPool(final int poolSize, final Type type, final List<IoRegistry> registries) {
final GryoMapper.Builder mapperBuilder = GryoMapper.build();
registries.forEach(mapperBuilder::addRegistry);
-
// should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
- mapper = mapperBuilder.create();
+ this.mapper = mapperBuilder.create();
+ createPool(poolSize, type, mapper);
+ }
+
+ private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
this.gryoReaders = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
- this.gryoReaders.add(GryoReader.build().mapper(mapper).create());
+ this.gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
}
}
if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
this.gryoWriters = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
- this.gryoWriters.add(GryoWriter.build().mapper(mapper).create());
+ this.gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
}
}
}
+ public GryoMapper getMapper() {
+ return this.mapper;
+ }
+
public GryoReader takeReader() {
final GryoReader reader = this.gryoReaders.poll();
return null == reader ? GryoReader.build().mapper(mapper).create() : reader;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
index 234a04e..ebc0ebc 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
@@ -18,24 +18,23 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.structure.util.Host;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
import org.apache.tinkerpop.shaded.kryo.io.Input;
import java.io.IOException;
@@ -68,12 +67,16 @@ public final class GryoReader implements GraphReader {
this.batchSize = batchSize;
}
+ public Kryo getKryo() {
+ return this.kryo;
+ }
+
/**
* Read data into a {@link Graph} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
* {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
*
- * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
- * {@link GraphWriter#writeGraph(OutputStream, Graph)}.
+ * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
+ * {@link GraphWriter#writeGraph(OutputStream, Graph)}.
* @param graphToWriteTo the graph to write to when reading from the stream.
* @throws IOException
*/
@@ -81,7 +84,7 @@ public final class GryoReader implements GraphReader {
public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
// dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't
// have vertex labels in the output we can't do this single pass
- final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
+ final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
final AtomicLong counter = new AtomicLong(0);
final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
@@ -112,11 +115,11 @@ public final class GryoReader implements GraphReader {
* Read {@link Vertex} objects from output generated by any of the {@link GryoWriter} {@code writeVertex} or
* {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
*
- * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
- * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
- * {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
- * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
- * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
+ * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
+ * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
+ * {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
+ * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
+ * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
* @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
*/
@Override
@@ -131,8 +134,8 @@ public final class GryoReader implements GraphReader {
* Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
* {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
*
- * @param inputStream a stream containing at least a single vertex as defined by the accompanying
- * {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
+ * @param inputStream a stream containing at least a single vertex as defined by the accompanying
+ * {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
* @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
*/
@Override
@@ -144,10 +147,10 @@ public final class GryoReader implements GraphReader {
* Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
* {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
*
- * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
- * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
- * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
- * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
+ * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
+ * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
+ * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
+ * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
* @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
*/
@Override
@@ -163,8 +166,8 @@ public final class GryoReader implements GraphReader {
* Read an {@link Edge} from output generated by {@link GryoWriter#writeEdge(OutputStream, Edge)} or via
* an {@link Edge} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
*
- * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
- * {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
+ * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
+ * {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
* @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
*/
@Override
@@ -180,14 +183,14 @@ public final class GryoReader implements GraphReader {
* {@link GryoWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
* to {@link GryoWriter#writeObject(OutputStream, Object)}.
*
- * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
- * {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
+ * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
+ * {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
* @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
* {@link Host} object.
*/
@Override
- public VertexProperty readVertexProperty (final InputStream inputStream,
- final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
+ public VertexProperty readVertexProperty(final InputStream inputStream,
+ final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
final Input input = new Input(inputStream);
readHeader(input);
final Attachable<VertexProperty> attachable = kryo.readObject(input, DetachedVertexProperty.class);
@@ -198,8 +201,8 @@ public final class GryoReader implements GraphReader {
* Read a {@link Property} from output generated by {@link GryoWriter#writeProperty(OutputStream, Property)} or
* via an {@link Property} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
*
- * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
- * {@link GraphWriter#writeProperty(OutputStream, Property)} method.
+ * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
+ * {@link GraphWriter#writeProperty(OutputStream, Property)} method.
* @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
*/
@Override
@@ -215,7 +218,7 @@ public final class GryoReader implements GraphReader {
* {@inheritDoc}
*/
@Override
- public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException{
+ public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
return clazz.cast(this.kryo.readClassAndObject(new Input(inputStream)));
}
@@ -230,7 +233,8 @@ public final class GryoReader implements GraphReader {
kryo.readClassAndObject(input);
final Vertex v = vertexMaker.apply(starGraph.getStarVertex());
- if (edgeMaker != null) starGraph.getStarVertex().edges(d).forEachRemaining(e -> edgeMaker.apply((Attachable<Edge>) e));
+ if (edgeMaker != null)
+ starGraph.getStarVertex().edges(d).forEachRemaining(e -> edgeMaker.apply((Attachable<Edge>) e));
return v;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
index d98b8c2..8ca8e51 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
@@ -51,6 +51,10 @@ public final class GryoWriter implements GraphWriter {
this.kryo = gryoMapper.createMapper();
}
+ public Kryo getKryo() {
+ return this.kryo;
+ }
+
/**
* {@inheritDoc}
*/
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
index 3bf232c..616daa2 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
@@ -84,7 +84,6 @@ public abstract class GroupCountTest extends AbstractGremlinProcessTest {
@Test
@LoadGraphWith(MODERN)
- @Ignore // TODO: fix Spark integration
public void g_V_outXcreatedX_groupCountXxX_capXxX() {
final Traversal<Vertex, Map<Vertex, Long>> traversal = get_g_V_outXcreatedX_groupCountXxX_capXxX();
final Object lopId = convertToVertexId("lop");
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/hadoop-gremlin/conf/hadoop-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-gryo.properties b/hadoop-gremlin/conf/hadoop-gryo.properties
index e92d517..75fb11f 100644
--- a/hadoop-gremlin/conf/hadoop-gryo.properties
+++ b/hadoop-gremlin/conf/hadoop-gryo.properties
@@ -27,7 +27,7 @@ gremlin.hadoop.outputLocation=output
####################################
spark.master=local[4]
spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
# spark.kryo.registrationRequired=true
# spark.storage.memoryFraction=0.2
# spark.eventLog.enabled=true
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/hadoop-gremlin/conf/hadoop-script.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-script.properties b/hadoop-gremlin/conf/hadoop-script.properties
index 1099394..3b41598 100644
--- a/hadoop-gremlin/conf/hadoop-script.properties
+++ b/hadoop-gremlin/conf/hadoop-script.properties
@@ -28,7 +28,7 @@ gremlin.hadoop.outputLocation=output
####################################
spark.master=local[4]
spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
# spark.kryo.registrationRequired=true
# spark.storage.memoryFraction=0.2
# spark.eventLog.enabled=true
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
new file mode 100644
index 0000000..6c6296d
--- /dev/null
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo
+
+import org.apache.spark.serializer.DeserializationStream
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader
+import org.apache.tinkerpop.shaded.kryo.KryoException
+import org.apache.tinkerpop.shaded.kryo.io.Input
+import scala.reflect.ClassTag
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoDeserializationStream extends DeserializationStream {
+
+ private final Input input;
+ private final GryoSerializerInstance gryoSerializer;
+
+ public GryoDeserializationStream(final GryoSerializerInstance gryoSerializer, final InputStream inputStream) {
+ this.gryoSerializer = gryoSerializer;
+ this.input = new Input(inputStream);
+ }
+
+ @Override
+ public <T> T readObject(final ClassTag<T> classTag) {
+ try {
+ final GryoReader gryoReader = this.gryoSerializer.getGryoPool().takeReader();
+ final T t = (T) gryoReader.getKryo().readClassAndObject(this.input);
+ this.gryoSerializer.getGryoPool().offerReader(gryoReader);
+ return t;
+ } catch (final Throwable e) {
+ if (e instanceof KryoException) {
+ final KryoException kryoException = (KryoException) e;
+ if (kryoException.getMessage().toLowerCase().contains("buffer underflow")) {
+ throw new EOFException();
+ }
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() {
+ this.input.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
index 19d79a8..291fcd3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
@@ -33,7 +33,7 @@ public interface InputRDD {
/**
* Read the graphRDD from the underlying graph system.
- * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
+ * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
* @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
* @return an adjacency list representation of the underlying graph system.
*/
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
deleted file mode 100644
index 1d8039a..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.DeserializationStream;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import scala.reflect.ClassTag;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoDeserializationStream extends DeserializationStream {
-
- private final InputStream inputStream;
- private final GryoSerializerInstance serializer;
-
- public GryoDeserializationStream(final GryoSerializerInstance serializer, final InputStream inputStream) {
- this.serializer = serializer;
- this.inputStream = inputStream;
- }
-
- @Override
- public <T> T readObject(final ClassTag<T> classTag) {
- return (T) this.serializer.getKryo().readClassAndObject(new Input(this.inputStream));
- }
-
- @Override
- public void close() {
- try {
- this.inputStream.close();
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
deleted file mode 100644
index f51444b..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.SerializationStream;
-import scala.reflect.ClassTag;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializationStream extends SerializationStream {
-
- private final OutputStream outputStream;
- private final GryoSerializerInstance serializer;
-
- public GryoSerializationStream(final GryoSerializerInstance serializer, final OutputStream outputStream) {
- this.outputStream = outputStream;
- this.serializer = serializer;
- }
-
- @Override
- public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
- try {
- this.outputStream.write(this.serializer.serialize(t, classTag).array());
- this.outputStream.flush();
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- return this;
- }
-
- @Override
- public void flush() {
- try {
- this.outputStream.flush();
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- @Override
- public void close() {
- try {
- this.outputStream.close();
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
deleted file mode 100644
index c2e99a9..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.SerializableWritable;
-import org.apache.spark.api.python.PythonBroadcast;
-import org.apache.spark.broadcast.HttpBroadcast;
-import org.apache.spark.scheduler.CompressedMapStatus;
-import org.apache.spark.serializer.Serializer;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.spark.util.SerializableConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
-import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializer extends Serializer {
- @Override
- public SerializerInstance newInstance() {
- return new GryoSerializerInstance(
- GryoMapper.build().
- addCustom(SerializableWritable.class, new JavaSerializer()).
- addCustom(Tuple2.class, new JavaSerializer()).
- addCustom(CompressedMapStatus.class, new JavaSerializer()).
- addCustom(HttpBroadcast.class, new JavaSerializer()).
- addCustom(PythonBroadcast.class, new JavaSerializer()).
- addCustom(MessagePayload.class, new JavaSerializer()).
- addCustom(ViewIncomingPayload.class, new JavaSerializer()).
- addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
- addCustom(ViewPayload.class, new JavaSerializer()).
- addCustom(SerializableConfiguration.class, new JavaSerializer()).
- addCustom(VertexWritable.class, new JavaSerializer()).
- addCustom(ObjectWritable.class, new JavaSerializer()).
- create().createMapper());
- // kryo.register(org.apache.spark.serializer.JavaIterableWrapperSerializer..MODULE$.wrapperClass(), new JavaIterableWrapperSerializer());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
deleted file mode 100644
index 9c74434..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.DeserializationStream;
-import org.apache.spark.serializer.SerializationStream;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
-import scala.reflect.ClassTag;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializerInstance extends SerializerInstance {
-
- private final Kryo kryo;
-
- public GryoSerializerInstance(final Kryo kryo) {
- this.kryo = kryo;
- }
-
- @Override
- public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
- final Output output = new Output(100000);
- this.kryo.writeClassAndObject(output, t);
- output.flush();
- return ByteBuffer.wrap(output.getBuffer());
- }
-
- @Override
- public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
- return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
- }
-
- @Override
- public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
- this.kryo.setClassLoader(classLoader);
- return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
- }
-
- @Override
- public SerializationStream serializeStream(final OutputStream outputStream) {
- return new GryoSerializationStream(this, outputStream);
- }
-
- @Override
- public DeserializationStream deserializeStream(final InputStream inputStream) {
- return new GryoDeserializationStream(this, inputStream);
- }
-
- public Kryo getKryo() {
- return this.kryo;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
new file mode 100644
index 0000000..6c1a164
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.reflect.ClassTag;
+
+import java.io.OutputStream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializationStream extends SerializationStream {
+
+ private final Output output;
+ private final GryoSerializerInstance gryoSerializer;
+
+ public GryoSerializationStream(final GryoSerializerInstance gryoSerializer, final OutputStream outputStream) {
+ this.output = new Output(outputStream);
+ this.gryoSerializer = gryoSerializer;
+ }
+
+ @Override
+ public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
+ final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
+ writer.getKryo().writeClassAndObject(this.output, t);
+ this.gryoSerializer.getGryoPool().offerWriter(writer);
+ return this;
+ }
+
+ @Override
+ public void flush() {
+ this.output.flush();
+ }
+
+ @Override
+ public void close() {
+ this.output.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
new file mode 100644
index 0000000..ee16126
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.python.PythonBroadcast;
+import org.apache.spark.broadcast.HttpBroadcast;
+import org.apache.spark.network.util.ByteUnit;
+import org.apache.spark.scheduler.CompressedMapStatus;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.util.SerializableConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializer extends Serializer {
+ private final boolean referenceTracking;
+ private final boolean registrationRequired;
+ //private final Option<String> userRegistrator;
+ private final long bufferSizeKb;
+ private final int bufferSize;
+ private final int maxBufferSizeMb;
+ private final int maxBufferSize;
+
+ private final GryoPool gryoPool;
+
+ public GryoSerializer(final SparkConf sparkConfiguration) {
+ this.bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
+ if (this.bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
+ throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + this.bufferSizeKb + " mb.");
+ } else {
+ this.bufferSize = (int) ByteUnit.KiB.toBytes(this.bufferSizeKb);
+ this.maxBufferSizeMb = (int) sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
+ if (this.maxBufferSizeMb >= ByteUnit.GiB.toMiB(2L)) {
+ throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than 2048 mb, got: " + this.maxBufferSizeMb + " mb.");
+ } else {
+ this.maxBufferSize = (int) ByteUnit.MiB.toBytes(this.maxBufferSizeMb);
+ this.referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+ this.registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+ //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
+
+ }
+ }
+ this.gryoPool = new GryoPool(makeApacheConfiguration(sparkConfiguration), builder -> {
+ try {
+ builder.
+ addCustom(SerializableWritable.class, new JavaSerializer()).
+ addCustom(Tuple2.class, new JavaSerializer()).
+ addCustom(CompressedMapStatus.class, new JavaSerializer()).
+ addCustom(HttpBroadcast.class, new JavaSerializer()).
+ addCustom(PythonBroadcast.class, new JavaSerializer()).
+ addCustom(BoxedUnit.class, new JavaSerializer()).
+ addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
+ addCustom(MessagePayload.class, new JavaSerializer()).
+ addCustom(ViewIncomingPayload.class, new JavaSerializer()).
+ addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
+ addCustom(ViewPayload.class, new JavaSerializer()).
+ addCustom(SerializableConfiguration.class, new JavaSerializer()).
+ addCustom(VertexWritable.class, new JavaSerializer()).
+ addCustom(ObjectWritable.class, new JavaSerializer());
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }, kryo -> {
+ kryo.setRegistrationRequired(this.registrationRequired);
+ kryo.setReferences(this.referenceTracking);
+ });
+
+ }
+
+ public Output newOutput() {
+ return new Output(this.bufferSize, this.maxBufferSize);
+ }
+
+ public GryoPool getGryoPool() {
+ return this.gryoPool;
+ }
+
+ @Override
+ public SerializerInstance newInstance() {
+ return new GryoSerializerInstance(this);
+ }
+
+ private static Configuration makeApacheConfiguration(final SparkConf sparkConfiguration) {
+ final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+ apacheConfiguration.setDelimiterParsingDisabled(true);
+ for (final Tuple2<String, String> tuple : sparkConfiguration.getAll()) {
+ apacheConfiguration.setProperty(tuple._1(), tuple._2());
+ }
+ return apacheConfiguration;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
new file mode 100644
index 0000000..764d8f0
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.reflect.ClassTag;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializerInstance extends SerializerInstance {
+
+ private final GryoSerializer gryoSerializer;
+ private final Output output;
+ private final Input input;
+
+ public GryoSerializerInstance(final GryoSerializer gryoSerializer) {
+ this.gryoSerializer = gryoSerializer;
+ this.input = new Input();
+ this.output = gryoSerializer.newOutput();
+ }
+
+ @Override
+ public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
+ final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
+ writer.getKryo().writeClassAndObject(this.output, t);
+ this.output.flush();
+ this.gryoSerializer.getGryoPool().offerWriter(writer);
+ return ByteBuffer.wrap(this.output.getBuffer());
+ }
+
+ @Override
+ public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
+ this.input.setBuffer(byteBuffer.array());
+ final GryoReader reader = this.gryoSerializer.getGryoPool().takeReader();
+ final T t = (T) reader.getKryo().readClassAndObject(this.input);
+ this.gryoSerializer.getGryoPool().offerReader(reader);
+ return t;
+ }
+
+ @Override
+ public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
+ return this.deserialize(byteBuffer, classTag);
+ }
+
+ @Override
+ public SerializationStream serializeStream(final OutputStream outputStream) {
+ return new GryoSerializationStream(this, outputStream);
+ }
+
+ @Override
+ public DeserializationStream deserializeStream(final InputStream inputStream) {
+ return new GryoDeserializationStream(this, inputStream);
+ }
+
+ public GryoPool getGryoPool() {
+ return this.gryoSerializer.getGryoPool();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
index 7ed741b..916a3d7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -115,7 +115,7 @@ public final class HadoopSparkGraphProvider extends AbstractGraphProvider {
/// spark configuration
put("spark.master", "local[4]");
// put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- put("spark.serializer","org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo.GryoSerializer");
+ put("spark.serializer","org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
// put("spark.kryo.registrationRequired",true);
}};
}