You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/29 19:49:28 UTC

[01/14] incubator-tinkerpop git commit: different errors in master/. I think we are closer on 3.1.0 than with 3.0.2.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 337323887 -> 3abdb2585


different errors in master/. I think we are closer on 3.1.0 than with 3.0.2.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9110a1e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9110a1e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9110a1e7

Branch: refs/heads/master
Commit: 9110a1e707d54da903ecb21d073d2d877e864e77
Parents: c6de3bf
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Sep 23 18:51:27 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Sep 23 18:51:27 2015 -0600

----------------------------------------------------------------------
 .../io/gryo/GryoDeserializationStream.java      | 55 ++++++++++++++
 .../io/gryo/GryoSerializationStream.java        | 70 ++++++++++++++++++
 .../computer/io/gryo/GryoSerializer.java        | 62 ++++++++++++++++
 .../io/gryo/GryoSerializerInstance.java         | 76 ++++++++++++++++++++
 .../computer/HadoopSparkGraphProvider.java      |  3 +-
 5 files changed, 265 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9110a1e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
new file mode 100644
index 0000000..f5e36f7
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
+
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import scala.reflect.ClassTag;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoDeserializationStream extends DeserializationStream {
+
+    private final InputStream inputStream;
+    private final GryoSerializerInstance serializer;
+
+    public GryoDeserializationStream(final GryoSerializerInstance serializer, final InputStream inputStream) {
+        this.serializer = serializer;
+        this.inputStream = inputStream;
+    }
+
+    @Override
+    public <T> T readObject(final ClassTag<T> classTag) {
+        return (T) this.serializer.getKryo().readClassAndObject(new Input(this.inputStream));
+    }
+
+    @Override
+    public void close() {
+        try {
+            this.inputStream.close();
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9110a1e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
new file mode 100644
index 0000000..e5981a0
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
+
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.SerializerInstance;
+import scala.reflect.ClassTag;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoSerializationStream extends SerializationStream {
+
+    private final OutputStream outputStream;
+    private final SerializerInstance serializer;
+
+    public GryoSerializationStream(final GryoSerializerInstance serializer, final OutputStream outputStream) {
+        this.outputStream = outputStream;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
+        try {
+            this.outputStream.write(this.serializer.serialize(t, classTag).array());
+            this.outputStream.flush();
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+        return this;
+    }
+
+    @Override
+    public void flush() {
+        try {
+            this.outputStream.flush();
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            this.outputStream.close();
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9110a1e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
new file mode 100644
index 0000000..c7c1468
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
+
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.api.python.PythonBroadcast;
+import org.apache.spark.broadcast.HttpBroadcast;
+import org.apache.spark.scheduler.CompressedMapStatus;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.util.SerializableConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoSerializer extends Serializer {
+    @Override
+    public SerializerInstance newInstance() {
+        return new GryoSerializerInstance(
+                GryoMapper.build().
+                        addCustom(SerializableWritable.class, new JavaSerializer()).
+                        addCustom(Tuple2.class, new JavaSerializer()).
+                        addCustom(CompressedMapStatus.class, new JavaSerializer()).
+                        addCustom(HttpBroadcast.class, new JavaSerializer()).
+                        addCustom(PythonBroadcast.class, new JavaSerializer()).
+                        addCustom(MessagePayload.class, new JavaSerializer()).
+                        addCustom(ViewIncomingPayload.class, new JavaSerializer()).
+                        addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
+                        addCustom(ViewPayload.class, new JavaSerializer()).
+                        addCustom(SerializableConfiguration.class, new JavaSerializer()).
+                        addCustom(VertexWritable.class, new JavaSerializer()).
+                        addCustom(ObjectWritable.class, new JavaSerializer()).
+                create().createMapper());
+        // kryo.register(org.apache.spark.serializer.JavaIterableWrapperSerializer..MODULE$.wrapperClass(), new JavaIterableWrapperSerializer());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9110a1e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
new file mode 100644
index 0000000..212abe2
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
+
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.reflect.ClassTag;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoSerializerInstance extends SerializerInstance {
+
+    private final Kryo kryo;
+
+    public GryoSerializerInstance(final Kryo kryo) {
+        this.kryo = kryo;
+    }
+
+    @Override
+    public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
+        final Output output = new Output(100000);
+        this.kryo.writeClassAndObject(output, t);
+        return ByteBuffer.wrap(output.getBuffer());
+    }
+
+    @Override
+    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
+        return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
+    }
+
+    @Override
+    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
+        this.kryo.setClassLoader(classLoader);
+        return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
+    }
+
+    @Override
+    public SerializationStream serializeStream(final OutputStream outputStream) {
+        return new GryoSerializationStream(this, outputStream);
+    }
+
+    @Override
+    public DeserializationStream deserializeStream(final InputStream inputStream) {
+        return new GryoDeserializationStream(this, inputStream);
+    }
+
+    public Kryo getKryo() {
+        return this.kryo;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9110a1e7/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
index c7642c8..7ed741b 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -114,7 +114,8 @@ public final class HadoopSparkGraphProvider extends AbstractGraphProvider {
             put("mapreduce.job.reduces", 4);
             /// spark configuration
             put("spark.master", "local[4]");
-            put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+            // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+            put("spark.serializer","org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo.GryoSerializer");
             // put("spark.kryo.registrationRequired",true);
         }};
     }


[14/14] incubator-tinkerpop git commit: Merge branch 'spark-gryo-tp31'

Posted by ok...@apache.org.
Merge branch 'spark-gryo-tp31'


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3abdb258
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3abdb258
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3abdb258

Branch: refs/heads/master
Commit: 3abdb258592e1039f0d8af80383612a8f8645be0
Parents: 3373238 c9b0a83
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 29 11:49:11 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 29 11:49:11 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   4 +-
 docs/src/implementations.asciidoc               |   2 +-
 .../gremlin/structure/io/gryo/GryoMapper.java   |  45 +++-
 .../gremlin/structure/io/gryo/GryoPool.java     | 229 +++++++++++++------
 .../gremlin/structure/io/gryo/GryoReader.java   |  62 ++---
 .../structure/io/gryo/GryoMapperTest.java       |  21 ++
 .../gremlin/structure/io/gryo/GryoPoolTest.java |  19 +-
 .../step/sideEffect/GroupCountTest.java         |   1 -
 hadoop-gremlin/conf/hadoop-gryo.properties      |   2 +-
 hadoop-gremlin/conf/hadoop-script.properties    |   2 +-
 .../hadoop/structure/io/HadoopPools.java        |  14 +-
 spark-gremlin/pom.xml                           |   2 +-
 .../io/gryo/GryoDeserializationStream.groovy    |  60 +++++
 .../process/computer/SparkGraphComputer.java    |   8 -
 .../spark/process/computer/io/InputRDD.java     |   2 +-
 .../io/gryo/GryoSerializationStream.java        |  56 +++++
 .../spark/structure/io/gryo/GryoSerializer.java | 124 ++++++++++
 .../io/gryo/GryoSerializerInstance.java         |  83 +++++++
 .../computer/HadoopSparkGraphProvider.java      |   5 +-
 19 files changed, 603 insertions(+), 138 deletions(-)
----------------------------------------------------------------------



[11/14] incubator-tinkerpop git commit: using the doWithXXX() methods of GryoPool and simplified the GryoSerializer.

Posted by ok...@apache.org.
using the doWithXXX() methods of GryoPool and simplified the GryoSerializer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c5827d2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c5827d2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c5827d2d

Branch: refs/heads/master
Commit: c5827d2dfbea56f9f7a849883d2ee0e7b85c21ca
Parents: dfd78ef
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 17:03:14 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 17:03:14 2015 -0600

----------------------------------------------------------------------
 .../io/gryo/GryoDeserializationStream.groovy      |  6 +-----
 .../io/gryo/GryoSerializationStream.java          |  5 +----
 .../structure/io/gryo/GryoSerializerInstance.java | 18 +++++++-----------
 3 files changed, 9 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c5827d2d/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
index b212dcf..4b508ea 100644
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
@@ -20,7 +20,6 @@
 package org.apache.tinkerpop.gremlin.spark.structure.io.gryo
 
 import org.apache.spark.serializer.DeserializationStream
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader
 import org.apache.tinkerpop.shaded.kryo.KryoException
 import org.apache.tinkerpop.shaded.kryo.io.Input
 import scala.reflect.ClassTag
@@ -42,10 +41,7 @@ public final class GryoDeserializationStream extends DeserializationStream {
     @Override
     public <T> T readObject(final ClassTag<T> classTag) {
         try {
-            final GryoReader gryoReader = this.gryoSerializer.getGryoPool().takeReader();
-            final T t = (T) gryoReader.getKryo().readClassAndObject(this.input);
-            this.gryoSerializer.getGryoPool().offerReader(gryoReader);
-            return t;
+            return this.gryoSerializer.getGryoPool().doWithReader { reader -> (T) reader.getKryo().readClassAndObject(this.input) }
         } catch (final Throwable e) {
             if (e instanceof KryoException) {
                 final KryoException kryoException = (KryoException) e;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c5827d2d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
index 6c1a164..33e809d 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
@@ -20,7 +20,6 @@
 package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
 
 import org.apache.spark.serializer.SerializationStream;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 import scala.reflect.ClassTag;
 
@@ -41,9 +40,7 @@ public final class GryoSerializationStream extends SerializationStream {
 
     @Override
     public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
-        final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
-        writer.getKryo().writeClassAndObject(this.output, t);
-        this.gryoSerializer.getGryoPool().offerWriter(writer);
+        this.gryoSerializer.getGryoPool().doWithWriter(writer -> writer.getKryo().writeClassAndObject(this.output, t));
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c5827d2d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
index 764d8f0..c0571c0 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
@@ -23,8 +23,6 @@ import org.apache.spark.serializer.DeserializationStream;
 import org.apache.spark.serializer.SerializationStream;
 import org.apache.spark.serializer.SerializerInstance;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 import scala.reflect.ClassTag;
@@ -50,25 +48,23 @@ public final class GryoSerializerInstance extends SerializerInstance {
 
     @Override
     public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
-        final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
-        writer.getKryo().writeClassAndObject(this.output, t);
-        this.output.flush();
-        this.gryoSerializer.getGryoPool().offerWriter(writer);
+        this.gryoSerializer.getGryoPool().doWithWriter(writer -> writer.getKryo().writeClassAndObject(this.output, t));
         return ByteBuffer.wrap(this.output.getBuffer());
     }
 
     @Override
     public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
         this.input.setBuffer(byteBuffer.array());
-        final GryoReader reader = this.gryoSerializer.getGryoPool().takeReader();
-        final T t = (T) reader.getKryo().readClassAndObject(this.input);
-        this.gryoSerializer.getGryoPool().offerReader(reader);
-        return t;
+        return this.gryoSerializer.getGryoPool().doWithReader(reader -> (T) reader.getKryo().readClassAndObject(this.input));
     }
 
     @Override
     public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
-        return this.deserialize(byteBuffer, classTag);
+        this.input.setBuffer(byteBuffer.array());
+        return this.gryoSerializer.getGryoPool().doWithReader(reader -> {
+            reader.getKryo().setClassLoader(classLoader);
+            return (T) reader.getKryo().readClassAndObject(this.input);
+        });
     }
 
     @Override


[10/14] incubator-tinkerpop git commit: minor tweaks.

Posted by ok...@apache.org.
minor tweaks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/dfd78efa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/dfd78efa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/dfd78efa

Branch: refs/heads/master
Commit: dfd78efa2853db0050a3974b7aebdd556bac0b95
Parents: b4a009c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 16:59:12 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 16:59:12 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                                                | 2 +-
 .../spark/structure/io/gryo/GryoDeserializationStream.groovy      | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/dfd78efa/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 4914b1a..01298d3 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,7 +26,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-* Added `GryoSerializer` which is a Spark `Serializer` and is the recommend serializer. Handles `Graph` and `GryoMapper` registries.
+* Added `GryoSerializer` as the new recommended Spark `Serializer`. Handles `Graph` and `GryoMapper` registries.
 * `GryoPool` now makes use of `GryoPool.Builder` for its construction.
 * Bumped to Apache Hadoop 2.7.1.
 * Bumped to Apache Giraph 1.1.0.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/dfd78efa/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
index 6c6296d..b212dcf 100644
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
@@ -32,6 +32,7 @@ public final class GryoDeserializationStream extends DeserializationStream {
 
     private final Input input;
     private final GryoSerializerInstance gryoSerializer;
+    private static final String BUFFER_UNDERFLOW = "buffer underflow";
 
     public GryoDeserializationStream(final GryoSerializerInstance gryoSerializer, final InputStream inputStream) {
         this.gryoSerializer = gryoSerializer;
@@ -48,7 +49,7 @@ public final class GryoDeserializationStream extends DeserializationStream {
         } catch (final Throwable e) {
             if (e instanceof KryoException) {
                 final KryoException kryoException = (KryoException) e;
-                if (kryoException.getMessage().toLowerCase().contains("buffer underflow")) {
+                if (kryoException.getMessage().toLowerCase().contains(BUFFER_UNDERFLOW)) {
                     throw new EOFException();
                 }
             }


[05/14] incubator-tinkerpop git commit: GryoPool now uses a Builder pattern. The old constructors are still there. I don't know if GryoPool is considered a 'public facing class'... if not, lets kill the GryoPool constructors. Else, deprecate. HadoopSpark

Posted by ok...@apache.org.
GryoPool now uses a Builder pattern. The old constructors are still there. I don't know if GryoPool is considered a 'public facing class'... if not, lets kill the GryoPool constructors. Else, deprecate. HadoopSparkGraphProvider has kryo registration set to true so we can fish out any unregistered Spark classes. So far, full test suite passes so our current registrations are good.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b32263e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b32263e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b32263e0

Branch: refs/heads/master
Commit: b32263e00e4a6ba281151fbb640eaa4105cbe92c
Parents: a7294ea
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 15:54:08 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 15:54:08 2015 -0600

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoPool.java     | 80 +++++++++++++++-----
 .../spark/structure/io/gryo/GryoSerializer.java | 80 ++++++++++----------
 .../computer/HadoopSparkGraphProvider.java      |  4 +-
 3 files changed, 105 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b32263e0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 275009f..97884bb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -48,20 +48,15 @@ public final class GryoPool {
     private Queue<GryoWriter> gryoWriters;
     private final GryoMapper mapper;
 
-    public GryoPool(final Configuration conf, final Consumer<GryoMapper.Builder> builderConsumer, final Consumer<Kryo> kryoConsumer) {
-        final GryoMapper.Builder mapperBuilder = GryoMapper.build();
-        tryCreateIoRegistry(conf.getList(CONFIG_IO_REGISTRY, Collections.<IoRegistry>emptyList())).forEach(mapperBuilder::addRegistry);
-        builderConsumer.accept(mapperBuilder);
-        // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
-        this.mapper = mapperBuilder.create();
-        this.createPool(conf.getInt(CONFIG_IO_GRYO_POOL_SIZE, 256), Type.READER_WRITER, this.mapper);
-        for (final GryoReader reader : this.gryoReaders) {
-            kryoConsumer.accept(reader.getKryo());
-        }
-        for (final GryoWriter writer : this.gryoWriters) {
-            kryoConsumer.accept(writer.getKryo());
-        }
+    public static GryoPool.Builder build() {
+        return new GryoPool.Builder();
+    }
 
+    /**
+     * Used by {@code GryoPool.Builder}.
+     */
+    private GryoPool() {
+        this.mapper = null;
     }
 
     /**
@@ -117,10 +112,6 @@ public final class GryoPool {
         }
     }
 
-    public GryoMapper getMapper() {
-        return this.mapper;
-    }
-
     public GryoReader takeReader() {
         final GryoReader reader = this.gryoReaders.poll();
         return null == reader ? GryoReader.build().mapper(mapper).create() : reader;
@@ -176,4 +167,59 @@ public final class GryoPool {
         });
         return registries;
     }
+
+    ////
+
+    public static class Builder {
+
+        private int poolSize = 256;
+        private Type type = Type.READER_WRITER;
+        private Consumer<GryoMapper.Builder> gryoMapperConsumer = null;
+        private Consumer<Kryo> kryoConsumer = null;
+        private Configuration configuration = null;
+
+        public Builder configuration(final Configuration configuration) {
+            this.configuration = configuration;
+            return this;
+        }
+
+        public Builder poolSize(int poolSize) {
+            this.poolSize = poolSize;
+            return this;
+        }
+
+        public Builder type(final Type type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder initializeMapper(final Consumer<GryoMapper.Builder> gryoMapperConsumer) {
+            this.gryoMapperConsumer = gryoMapperConsumer;
+            return this;
+        }
+
+        public Builder initializeKryo(final Consumer<Kryo> kryoConsumer) {
+            this.kryoConsumer = kryoConsumer;
+            return this;
+        }
+
+        public GryoPool create() {
+            final GryoMapper.Builder mapper = GryoMapper.build();
+            final GryoPool gryoPool = new GryoPool();
+            if (null != this.configuration)
+                tryCreateIoRegistry(this.configuration.getList(CONFIG_IO_REGISTRY, Collections.emptyList())).forEach(mapper::addRegistry);
+            if (null != this.gryoMapperConsumer)
+                this.gryoMapperConsumer.accept(mapper);
+            gryoPool.createPool(this.poolSize, this.type, mapper.create());
+            if (null != this.kryoConsumer) {
+                for (final GryoReader reader : gryoPool.gryoReaders) {
+                    kryoConsumer.accept(reader.getKryo());
+                }
+                for (final GryoWriter writer : gryoPool.gryoWriters) {
+                    kryoConsumer.accept(writer.getKryo());
+                }
+            }
+            return gryoPool;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b32263e0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index ee16126..7b12807 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -47,58 +47,58 @@ import scala.runtime.BoxedUnit;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class GryoSerializer extends Serializer {
-    private final boolean referenceTracking;
-    private final boolean registrationRequired;
+
     //private final Option<String> userRegistrator;
-    private final long bufferSizeKb;
     private final int bufferSize;
-    private final int maxBufferSizeMb;
     private final int maxBufferSize;
 
     private final GryoPool gryoPool;
 
     public GryoSerializer(final SparkConf sparkConfiguration) {
-        this.bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
-        if (this.bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
-            throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + this.bufferSizeKb + " mb.");
+        final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
+        final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
+        final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+        final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+        if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
+            throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
         } else {
-            this.bufferSize = (int) ByteUnit.KiB.toBytes(this.bufferSizeKb);
-            this.maxBufferSizeMb = (int) sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
-            if (this.maxBufferSizeMb >= ByteUnit.GiB.toMiB(2L)) {
-                throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than 2048 mb, got: " + this.maxBufferSizeMb + " mb.");
+            this.bufferSize = (int) ByteUnit.KiB.toBytes(bufferSizeKb);
+            if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2L)) {
+                throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than 2048 mb, got: " + maxBufferSizeMb + " mb.");
             } else {
-                this.maxBufferSize = (int) ByteUnit.MiB.toBytes(this.maxBufferSizeMb);
-                this.referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
-                this.registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+                this.maxBufferSize = (int) ByteUnit.MiB.toBytes(maxBufferSizeMb);
                 //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
-
             }
         }
-        this.gryoPool = new GryoPool(makeApacheConfiguration(sparkConfiguration), builder -> {
-            try {
-                builder.
-                        addCustom(SerializableWritable.class, new JavaSerializer()).
-                        addCustom(Tuple2.class, new JavaSerializer()).
-                        addCustom(CompressedMapStatus.class, new JavaSerializer()).
-                        addCustom(HttpBroadcast.class, new JavaSerializer()).
-                        addCustom(PythonBroadcast.class, new JavaSerializer()).
-                        addCustom(BoxedUnit.class, new JavaSerializer()).
-                        addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
-                        addCustom(MessagePayload.class, new JavaSerializer()).
-                        addCustom(ViewIncomingPayload.class, new JavaSerializer()).
-                        addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
-                        addCustom(ViewPayload.class, new JavaSerializer()).
-                        addCustom(SerializableConfiguration.class, new JavaSerializer()).
-                        addCustom(VertexWritable.class, new JavaSerializer()).
-                        addCustom(ObjectWritable.class, new JavaSerializer());
-            } catch (final ClassNotFoundException e) {
-                throw new IllegalStateException(e);
-            }
-        }, kryo -> {
-            kryo.setRegistrationRequired(this.registrationRequired);
-            kryo.setReferences(this.referenceTracking);
-        });
-
+        this.gryoPool = GryoPool.build().
+                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
+                configuration(makeApacheConfiguration(sparkConfiguration)).
+                initializeMapper(builder -> {
+                    try {
+                        builder.
+                                addCustom(SerializableWritable.class, new JavaSerializer()).
+                                addCustom(Tuple2.class, new JavaSerializer()).
+                                addCustom(CompressedMapStatus.class, new JavaSerializer()).
+                                addCustom(HttpBroadcast.class, new JavaSerializer()).
+                                addCustom(PythonBroadcast.class, new JavaSerializer()).
+                                addCustom(BoxedUnit.class, new JavaSerializer()).
+                                addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
+                                addCustom(MessagePayload.class, new JavaSerializer()).
+                                addCustom(ViewIncomingPayload.class, new JavaSerializer()).
+                                addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
+                                addCustom(ViewPayload.class, new JavaSerializer()).
+                                addCustom(SerializableConfiguration.class, new JavaSerializer()).
+                                addCustom(VertexWritable.class, new JavaSerializer()).
+                                addCustom(ObjectWritable.class, new JavaSerializer());
+                                // add these as we find ClassNotFoundExceptions
+                    } catch (final ClassNotFoundException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }).
+                initializeKryo(kryo -> {
+                    kryo.setRegistrationRequired(registrationRequired);
+                    kryo.setReferences(referenceTracking);
+                }).create();
     }
 
     public Output newOutput() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b32263e0/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
index 916a3d7..a82b83f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -115,8 +115,8 @@ public final class HadoopSparkGraphProvider extends AbstractGraphProvider {
             /// spark configuration
             put("spark.master", "local[4]");
             // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-            put("spark.serializer","org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-            // put("spark.kryo.registrationRequired",true);
+            put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+            put("spark.kryo.registrationRequired", true);
         }};
     }
 


[13/14] incubator-tinkerpop git commit: bumped to Spark 1.5.1.

Posted by ok...@apache.org.
bumped to Spark 1.5.1.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c9b0a839
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c9b0a839
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c9b0a839

Branch: refs/heads/master
Commit: c9b0a839d6e46c0044286fbbc8172089202d1eeb
Parents: d165ece
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 29 11:48:57 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 29 11:48:57 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc    | 2 +-
 spark-gremlin/pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c9b0a839/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 01298d3..435ee3b 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -30,7 +30,7 @@ TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
 * `GryoPool` now makes use of `GryoPool.Builder` for its construction.
 * Bumped to Apache Hadoop 2.7.1.
 * Bumped to Apache Giraph 1.1.0.
-* Bumped to Apache Spark 1.5.0.
+* Bumped to Apache Spark 1.5.1.
 * Split Hadoop-Gremlin apart such there is now `hadoop-gremlin`, `spark-gremlin`, and `giraph-gremlin` (and respective `GremlinPlugins`).
 * Added `LambdaCollectingBarrierStep` which generalizes `NoOpBarrierStep` and allows for `barrier(normSack)`-type operations.
 * Fixed bugs in the Gremlin Server's NIO protocol both on the server and driver side.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c9b0a839/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
index e183419..73e52a9 100644
--- a/spark-gremlin/pom.xml
+++ b/spark-gremlin/pom.xml
@@ -104,7 +104,7 @@
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
-            <version>1.5.0</version>
+            <version>1.5.1</version>
             <exclusions>
                 <!-- self conflicts -->
                 <exclusion>


[04/14] incubator-tinkerpop git commit: Merge branch 'master' into spark-gryo-tp31

Posted by ok...@apache.org.
Merge branch 'master' into spark-gryo-tp31


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/a7294ea0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/a7294ea0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/a7294ea0

Branch: refs/heads/master
Commit: a7294ea0a04f293074b78c4428a008a0396fe5a2
Parents: fde13ed 5a8b61c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 13:07:34 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 13:07:34 2015 -0600

----------------------------------------------------------------------
 docs/static/images/renaissance-gremlin.png | Bin 0 -> 1234859 bytes
 1 file changed, 0 insertions(+), 0 deletions(-)
----------------------------------------------------------------------



[12/14] incubator-tinkerpop git commit: Refactoring GryoPool for spark usage.

Posted by ok...@apache.org.
Refactoring GryoPool for spark usage.

Removed getKryo() from GryoReader and GryoWriter.  Kryo instances can now be gotten directly from the GryoPool. Added configuration options to GryoMapper Builder to allow for lower-level settings to be passed to Kryo.  This allowed removal of the Consumer<Kryo> function passed to the GryoPool Builder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/d165ece9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/d165ece9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/d165ece9

Branch: refs/heads/master
Commit: d165ece905ef5f34c2c00bbabd00ff74006e8dc5
Parents: c5827d2
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Sep 29 13:08:12 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Sep 29 13:08:12 2015 -0400

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoMapper.java   | 45 ++++++++--
 .../gremlin/structure/io/gryo/GryoPool.java     | 92 +++++++++++---------
 .../gremlin/structure/io/gryo/GryoReader.java   |  4 -
 .../gremlin/structure/io/gryo/GryoWriter.java   |  4 -
 .../structure/io/gryo/GryoMapperTest.java       | 21 +++++
 .../io/gryo/GryoDeserializationStream.groovy    |  2 +-
 .../io/gryo/GryoSerializationStream.java        |  2 +-
 .../spark/structure/io/gryo/GryoSerializer.java | 35 ++++----
 .../io/gryo/GryoSerializerInstance.java         | 10 +--
 9 files changed, 136 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 619d657..cba2497 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -122,16 +122,22 @@ public final class GryoMapper implements Mapper<Kryo> {
     public static final byte[] GIO = "gio".getBytes();
     public static final byte[] HEADER = Arrays.copyOf(GIO, 16);
     private final List<Triplet<Class, Function<Kryo, Serializer>, Integer>> serializationList;
+    private boolean registrationRequired;
+    private boolean referenceTracking;
 
-    private GryoMapper(final List<Triplet<Class, Function<Kryo, Serializer>, Integer>> serializationList) {
-        this.serializationList = serializationList;
+    private GryoMapper(final Builder builder) {
+        this.serializationList = builder.serializationList;
+        this.registrationRequired = builder.registrationRequired;
+        this.referenceTracking = builder.referenceTracking;
     }
 
     @Override
     public Kryo createMapper() {
         final Kryo kryo = new Kryo(new GryoClassResolver(), new MapReferenceResolver(), new DefaultStreamFactory());
         kryo.addDefaultSerializer(Map.Entry.class, new EntrySerializer());
-        kryo.setRegistrationRequired(true);
+        kryo.setRegistrationRequired(registrationRequired);
+        kryo.setReferences(referenceTracking);
+
         serializationList.forEach(p -> {
             final Function<Kryo, Serializer> serializer = p.getValue1();
             if (null == serializer)
@@ -251,13 +257,16 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Pair.class, kryo -> new PairSerializer(), 88)); // ***LAST ID**
         }};
 
-        private List<IoRegistry> registries = new ArrayList<>();
+        private final List<IoRegistry> registries = new ArrayList<>();
 
         /**
          * Starts numbering classes for Gryo serialization at 65536 to leave room for future usage by TinkerPop.
          */
         private final AtomicInteger currentSerializationId = new AtomicInteger(65536);
 
+        private boolean registrationRequired = true;
+        private boolean referenceTracking = true;
+
         private Builder() {
         }
 
@@ -299,6 +308,32 @@ public final class GryoMapper implements Mapper<Kryo> {
         }
 
         /**
+         * When set to {@code true}, all classes serialized by the {@code Kryo} instances created from this
+         * {@link GryoMapper} must have their classes known up front and registered appropriately through this
+         * builder.  By default this value is {@code true}.  This approach is more efficient than setting the
+         * value to {@code false}.
+         *
+         * @param registrationRequired set to {@code true} if the classes should be registered up front or
+         * {@code false} otherwise
+         */
+        public Builder registrationRequired(final boolean registrationRequired) {
+            this.registrationRequired = registrationRequired;
+            return this;
+        }
+
+        /**
+         * By default, each appearance of an object in the graph after the first is stored as an integer ordinal.
+         * This allows multiple references to the same object and cyclic graphs to be serialized. This has a small
+         * amount of overhead and can be disabled to save space if it is not needed.
+         *
+         * @param referenceTracking set to {@code true} to enable and {@code false} otherwise
+         */
+        public Builder referenceTracking(final boolean referenceTracking) {
+            this.referenceTracking = referenceTracking;
+            return this;
+        }
+
+        /**
          * Creates a {@code GryoMapper}.
          */
         public GryoMapper create() {
@@ -321,7 +356,7 @@ public final class GryoMapper implements Mapper<Kryo> {
                 });
             });
 
-            return new GryoMapper(serializationList);
+            return new GryoMapper(this);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 0b6faab..e7bf636 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -45,6 +45,7 @@ public final class GryoPool {
 
     private Queue<GryoReader> gryoReaders;
     private Queue<GryoWriter> gryoWriters;
+    private Queue<Kryo> kryos;
     private GryoMapper mapper;
 
     public static GryoPool.Builder build() {
@@ -57,20 +58,13 @@ public final class GryoPool {
     private GryoPool() {
     }
 
-    private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
-        this.mapper = gryoMapper;
-        if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
-            this.gryoReaders = new LinkedBlockingQueue<>(poolSize);
-            for (int i = 0; i < poolSize; i++) {
-                this.gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
-            }
-        }
-        if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
-            this.gryoWriters = new LinkedBlockingQueue<>(poolSize);
-            for (int i = 0; i < poolSize; i++) {
-                this.gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
-            }
-        }
+    public GryoMapper getMapper() {
+        return mapper;
+    }
+
+    public Kryo takeKryo() {
+        final Kryo kryo = kryos.poll();
+        return null == kryo ? mapper.createMapper() : kryo;
     }
 
     public GryoReader takeReader() {
@@ -83,25 +77,63 @@ public final class GryoPool {
         return null == writer ? GryoWriter.build().mapper(mapper).create() : writer;
     }
 
+    public void offerKryo(final Kryo kryo) {
+        kryos.offer(kryo);
+    }
+
     public void offerReader(final GryoReader gryoReader) {
-        this.gryoReaders.offer(gryoReader);
+        gryoReaders.offer(gryoReader);
     }
 
     public void offerWriter(final GryoWriter gryoWriter) {
-        this.gryoWriters.offer(gryoWriter);
+        gryoWriters.offer(gryoWriter);
+    }
+
+    public <A> A readWithKryo(final Function<Kryo, A> kryoFunction) {
+        final Kryo kryo = takeKryo();
+        final A a = kryoFunction.apply(kryo);
+        offerKryo(kryo);
+        return a;
+    }
+
+    public void writeWithKryo(final Consumer<Kryo> kryoConsumer) {
+        final Kryo kryo = takeKryo();
+        kryoConsumer.accept(kryo);
+        offerKryo(kryo);
     }
 
     public <A> A doWithReader(final Function<GryoReader, A> readerFunction) {
-        final GryoReader gryoReader = this.takeReader();
+        final GryoReader gryoReader = takeReader();
         final A a = readerFunction.apply(gryoReader);
-        this.offerReader(gryoReader);
+        offerReader(gryoReader);
         return a;
     }
 
     public void doWithWriter(final Consumer<GryoWriter> writerFunction) {
-        final GryoWriter gryoWriter = this.takeWriter();
+        final GryoWriter gryoWriter = takeWriter();
         writerFunction.accept(gryoWriter);
-        this.offerWriter(gryoWriter);
+        offerWriter(gryoWriter);
+    }
+
+    private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
+        this.mapper = gryoMapper;
+        if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
+            gryoReaders = new LinkedBlockingQueue<>(poolSize);
+            for (int i = 0; i < poolSize; i++) {
+                gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
+            }
+        }
+        if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
+            gryoWriters = new LinkedBlockingQueue<>(poolSize);
+            for (int i = 0; i < poolSize; i++) {
+                gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
+            }
+        }
+
+        kryos = new LinkedBlockingQueue<>(poolSize);
+        for (int i = 0; i < poolSize; i++) {
+            kryos.add(gryoMapper.createMapper());
+        }
     }
 
     ////
@@ -112,7 +144,6 @@ public final class GryoPool {
         private List<IoRegistry> ioRegistries = new ArrayList<>();
         private Type type = Type.READER_WRITER;
         private Consumer<GryoMapper.Builder> gryoMapperConsumer = null;
-        private Consumer<Kryo> kryoConsumer = null;
 
         /**
          * The {@code IoRegistry} class names to use for the {@code GryoPool}
@@ -170,17 +201,6 @@ public final class GryoPool {
         }
 
         /**
-         * A consumer to update all the {@link Kryo} instances for all {@code GryoReader} and {@code GryoWriter} instances.
-         *
-         * @param kryoConsumer the consumer
-         * @return the updated builder
-         */
-        public Builder initializeKryo(final Consumer<Kryo> kryoConsumer) {
-            this.kryoConsumer = kryoConsumer;
-            return this;
-        }
-
-        /**
          * Create the {@code GryoPool} from this builder.
          *
          * @return the new pool
@@ -193,14 +213,6 @@ public final class GryoPool {
             if (null != this.gryoMapperConsumer)
                 this.gryoMapperConsumer.accept(mapper);
             gryoPool.createPool(this.poolSize, this.type, mapper.create());
-            if (null != this.kryoConsumer) {
-                for (final GryoReader reader : gryoPool.gryoReaders) {
-                    kryoConsumer.accept(reader.getKryo());
-                }
-                for (final GryoWriter writer : gryoPool.gryoWriters) {
-                    kryoConsumer.accept(writer.getKryo());
-                }
-            }
             return gryoPool;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
index ebc0ebc..0080d68 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
@@ -67,10 +67,6 @@ public final class GryoReader implements GraphReader {
         this.batchSize = batchSize;
     }
 
-    public Kryo getKryo() {
-        return this.kryo;
-    }
-
     /**
      * Read data into a {@link Graph} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
index 8ca8e51..d98b8c2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
@@ -51,10 +51,6 @@ public final class GryoWriter implements GraphWriter {
         this.kryo = gryoMapper.createMapper();
     }
 
-    public Kryo getKryo() {
-        return this.kryo;
-    }
-
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
index b500495..9bdec55 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapperTest.java
@@ -47,6 +47,27 @@ public class GryoMapperTest {
     }
 
     @Test
+    public void shouldSerializeWithoutRegistration() throws Exception {
+        final GryoMapper mapper = GryoMapper.build().registrationRequired(false).create();
+        final Kryo kryo = mapper.createMapper();
+        try (final OutputStream stream = new ByteArrayOutputStream()) {
+            final Output out = new Output(stream);
+            final IoX x = new IoX("x");
+            final IoY y = new IoY(100, 200);
+            kryo.writeClassAndObject(out, x);
+            kryo.writeClassAndObject(out, y);
+
+            try (final InputStream inputStream = new ByteArrayInputStream(out.toBytes())) {
+                final Input input = new Input(inputStream);
+                final IoX readX = (IoX) kryo.readClassAndObject(input);
+                final IoY readY = (IoY) kryo.readClassAndObject(input);
+                assertEquals(x, readX);
+                assertEquals(y, readY);
+            }
+        }
+    }
+
+    @Test
     public void shouldRegisterMultipleIoRegistryToSerialize() throws Exception {
         final GryoMapper mapper = GryoMapper.build()
                 .addRegistry(IoXIoRegistry.InstanceBased.getInstance())

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
index 4b508ea..d01a314 100644
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
@@ -41,7 +41,7 @@ public final class GryoDeserializationStream extends DeserializationStream {
     @Override
     public <T> T readObject(final ClassTag<T> classTag) {
         try {
-            return this.gryoSerializer.getGryoPool().doWithReader { reader -> (T) reader.getKryo().readClassAndObject(this.input) }
+            return this.gryoSerializer.getGryoPool().readWithKryo { kryo -> (T) kryo.readClassAndObject(this.input) }
         } catch (final Throwable e) {
             if (e instanceof KryoException) {
                 final KryoException kryoException = (KryoException) e;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
index 33e809d..608dcca 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
@@ -40,7 +40,7 @@ public final class GryoSerializationStream extends SerializationStream {
 
     @Override
     public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
-        this.gryoSerializer.getGryoPool().doWithWriter(writer -> writer.getKryo().writeClassAndObject(this.output, t));
+        this.gryoSerializer.getGryoPool().writeWithKryo(kryo -> kryo.writeClassAndObject(this.output, t));
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 7c409ef..f0b1147 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -77,29 +77,26 @@ public final class GryoSerializer extends Serializer {
                 ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {
-                        builder.
-                                addCustom(SerializableWritable.class, new JavaSerializer()).
-                                addCustom(Tuple2.class, new JavaSerializer()).
-                                addCustom(CompressedMapStatus.class, new JavaSerializer()).
-                                addCustom(HttpBroadcast.class, new JavaSerializer()).
-                                addCustom(PythonBroadcast.class, new JavaSerializer()).
-                                addCustom(BoxedUnit.class, new JavaSerializer()).
-                                addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
-                                addCustom(MessagePayload.class, new JavaSerializer()).
-                                addCustom(ViewIncomingPayload.class, new JavaSerializer()).
-                                addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
-                                addCustom(ViewPayload.class, new JavaSerializer()).
-                                addCustom(SerializableConfiguration.class, new JavaSerializer()).
-                                addCustom(VertexWritable.class, new JavaSerializer()).
-                                addCustom(ObjectWritable.class, new JavaSerializer());
+                        builder.addCustom(SerializableWritable.class, new JavaSerializer())
+                            .addCustom(Tuple2.class, new JavaSerializer())
+                            .addCustom(CompressedMapStatus.class, new JavaSerializer())
+                            .addCustom(HttpBroadcast.class, new JavaSerializer())
+                            .addCustom(PythonBroadcast.class, new JavaSerializer())
+                            .addCustom(BoxedUnit.class, new JavaSerializer())
+                            .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer())
+                            .addCustom(MessagePayload.class, new JavaSerializer())
+                            .addCustom(ViewIncomingPayload.class, new JavaSerializer())
+                            .addCustom(ViewOutgoingPayload.class, new JavaSerializer())
+                            .addCustom(ViewPayload.class, new JavaSerializer())
+                            .addCustom(SerializableConfiguration.class, new JavaSerializer())
+                            .addCustom(VertexWritable.class, new JavaSerializer())
+                            .addCustom(ObjectWritable.class, new JavaSerializer())
+                            .referenceTracking(referenceTracking)
+                            .registrationRequired(registrationRequired);
                                 // add these as we find ClassNotFoundExceptions
                     } catch (final ClassNotFoundException e) {
                         throw new IllegalStateException(e);
                     }
-                }).
-                initializeKryo(kryo -> {
-                    kryo.setRegistrationRequired(registrationRequired);
-                    kryo.setReferences(referenceTracking);
                 }).create();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d165ece9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
index c0571c0..3f93c24 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
@@ -48,22 +48,22 @@ public final class GryoSerializerInstance extends SerializerInstance {
 
     @Override
     public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
-        this.gryoSerializer.getGryoPool().doWithWriter(writer -> writer.getKryo().writeClassAndObject(this.output, t));
+        this.gryoSerializer.getGryoPool().writeWithKryo(kryo -> kryo.writeClassAndObject(this.output, t));
         return ByteBuffer.wrap(this.output.getBuffer());
     }
 
     @Override
     public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
         this.input.setBuffer(byteBuffer.array());
-        return this.gryoSerializer.getGryoPool().doWithReader(reader -> (T) reader.getKryo().readClassAndObject(this.input));
+        return this.gryoSerializer.getGryoPool().readWithKryo(kryo -> (T) kryo.readClassAndObject(this.input));
     }
 
     @Override
     public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
         this.input.setBuffer(byteBuffer.array());
-        return this.gryoSerializer.getGryoPool().doWithReader(reader -> {
-            reader.getKryo().setClassLoader(classLoader);
-            return (T) reader.getKryo().readClassAndObject(this.input);
+        return this.gryoSerializer.getGryoPool().readWithKryo(kryo -> {
+            kryo.setClassLoader(classLoader);
+            return (T) kryo.readClassAndObject(this.input);
         });
     }
 


[02/14] incubator-tinkerpop git commit: random tweaks, but still buffer underflow.

Posted by ok...@apache.org.
random tweaks, but still buffer underflow.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/a8b1439a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/a8b1439a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/a8b1439a

Branch: refs/heads/master
Commit: a8b1439acd7098c2ea458f960fadbbbb62bf5f98
Parents: 9110a1e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Sep 23 20:00:04 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Sep 23 20:00:04 2015 -0600

----------------------------------------------------------------------
 .../process/computer/io/gryo/GryoDeserializationStream.java     | 2 +-
 .../spark/process/computer/io/gryo/GryoSerializationStream.java | 5 ++---
 .../gremlin/spark/process/computer/io/gryo/GryoSerializer.java  | 4 ++--
 .../spark/process/computer/io/gryo/GryoSerializerInstance.java  | 3 ++-
 4 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a8b1439a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
index f5e36f7..1d8039a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
@@ -29,7 +29,7 @@ import java.io.InputStream;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class GryoDeserializationStream extends DeserializationStream {
+public final class GryoDeserializationStream extends DeserializationStream {
 
     private final InputStream inputStream;
     private final GryoSerializerInstance serializer;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a8b1439a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
index e5981a0..f51444b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
@@ -20,7 +20,6 @@
 package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
 
 import org.apache.spark.serializer.SerializationStream;
-import org.apache.spark.serializer.SerializerInstance;
 import scala.reflect.ClassTag;
 
 import java.io.IOException;
@@ -29,10 +28,10 @@ import java.io.OutputStream;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class GryoSerializationStream extends SerializationStream {
+public final class GryoSerializationStream extends SerializationStream {
 
     private final OutputStream outputStream;
-    private final SerializerInstance serializer;
+    private final GryoSerializerInstance serializer;
 
     public GryoSerializationStream(final GryoSerializerInstance serializer, final OutputStream outputStream) {
         this.outputStream = outputStream;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a8b1439a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
index c7c1468..c2e99a9 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
@@ -39,7 +39,7 @@ import scala.Tuple2;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class GryoSerializer extends Serializer {
+public final class GryoSerializer extends Serializer {
     @Override
     public SerializerInstance newInstance() {
         return new GryoSerializerInstance(
@@ -56,7 +56,7 @@ public class GryoSerializer extends Serializer {
                         addCustom(SerializableConfiguration.class, new JavaSerializer()).
                         addCustom(VertexWritable.class, new JavaSerializer()).
                         addCustom(ObjectWritable.class, new JavaSerializer()).
-                create().createMapper());
+                        create().createMapper());
         // kryo.register(org.apache.spark.serializer.JavaIterableWrapperSerializer..MODULE$.wrapperClass(), new JavaIterableWrapperSerializer());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a8b1439a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
index 212abe2..9c74434 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
@@ -34,7 +34,7 @@ import java.nio.ByteBuffer;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class GryoSerializerInstance extends SerializerInstance {
+public final class GryoSerializerInstance extends SerializerInstance {
 
     private final Kryo kryo;
 
@@ -46,6 +46,7 @@ public class GryoSerializerInstance extends SerializerInstance {
     public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
         final Output output = new Output(100000);
         this.kryo.writeClassAndObject(output, t);
+        output.flush();
         return ByteBuffer.wrap(output.getBuffer());
     }
 


[03/14] incubator-tinkerpop git commit: GryoPool constructor added, GryoReader/Writer now have getKryo() public methods. GryoSerializer now exists as a Spark serializer which will connect with the Graph and get its registered serializers. Updated conf/pr

Posted by ok...@apache.org.
GryoPool constructor added, GryoReader/Writer now have getKryo() public methods. GryoSerializer now exists as a Spark serializer which will connect with the Graph and get its registered serializers. Updated conf/properties.files with GryoSerializer as the new default serializer instead of Sparks KryoSerializer. Uncommented GroupCountTest test which failed due to serialization issue in 3.0.x.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fde13ed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fde13ed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fde13ed5

Branch: refs/heads/master
Commit: fde13ed5d9db8c18bd9e0d8eb4541755aa181697
Parents: a8b1439
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 13:03:11 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 13:03:11 2015 -0600

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoPool.java     |  36 +++++-
 .../gremlin/structure/io/gryo/GryoReader.java   |  66 +++++-----
 .../gremlin/structure/io/gryo/GryoWriter.java   |   4 +
 .../step/sideEffect/GroupCountTest.java         |   1 -
 hadoop-gremlin/conf/hadoop-gryo.properties      |   2 +-
 hadoop-gremlin/conf/hadoop-script.properties    |   2 +-
 .../io/gryo/GryoDeserializationStream.groovy    |  63 ++++++++++
 .../spark/process/computer/io/InputRDD.java     |   2 +-
 .../io/gryo/GryoDeserializationStream.java      |  55 --------
 .../io/gryo/GryoSerializationStream.java        |  69 ----------
 .../computer/io/gryo/GryoSerializer.java        |  62 ---------
 .../io/gryo/GryoSerializerInstance.java         |  77 ------------
 .../io/gryo/GryoSerializationStream.java        |  59 +++++++++
 .../spark/structure/io/gryo/GryoSerializer.java | 125 +++++++++++++++++++
 .../io/gryo/GryoSerializerInstance.java         |  87 +++++++++++++
 .../computer/HadoopSparkGraphProvider.java      |   2 +-
 16 files changed, 407 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 426997e..275009f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -47,6 +48,22 @@ public final class GryoPool {
     private Queue<GryoWriter> gryoWriters;
     private final GryoMapper mapper;
 
+    public GryoPool(final Configuration conf, final Consumer<GryoMapper.Builder> builderConsumer, final Consumer<Kryo> kryoConsumer) {
+        final GryoMapper.Builder mapperBuilder = GryoMapper.build();
+        tryCreateIoRegistry(conf.getList(CONFIG_IO_REGISTRY, Collections.<IoRegistry>emptyList())).forEach(mapperBuilder::addRegistry);
+        builderConsumer.accept(mapperBuilder);
+        // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
+        this.mapper = mapperBuilder.create();
+        this.createPool(conf.getInt(CONFIG_IO_GRYO_POOL_SIZE, 256), Type.READER_WRITER, this.mapper);
+        for (final GryoReader reader : this.gryoReaders) {
+            kryoConsumer.accept(reader.getKryo());
+        }
+        for (final GryoWriter writer : this.gryoWriters) {
+            kryoConsumer.accept(writer.getKryo());
+        }
+
+    }
+
     /**
      * Create a pool of readers and writers from a {@code Configuration} object.  There are two configuration keys
      * expected: "gremlin.io.registry" which defines comma separated list of the fully qualified class names of
@@ -73,30 +90,37 @@ public final class GryoPool {
      * Create a pool of a readers, writers or both of the specified size with an optional {@link IoRegistry} object
      * which would allow custom serializers to be registered to the pool.
      *
-     * @param poolSize initial size of the pool.
-     * @param type the type of pool.
+     * @param poolSize   initial size of the pool.
+     * @param type       the type of pool.
      * @param registries a list of registries to assign to each {@link GryoReader} and {@link GryoWriter} instances.
      */
     public GryoPool(final int poolSize, final Type type, final List<IoRegistry> registries) {
         final GryoMapper.Builder mapperBuilder = GryoMapper.build();
         registries.forEach(mapperBuilder::addRegistry);
-
         // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
-        mapper = mapperBuilder.create();
+        this.mapper = mapperBuilder.create();
+        createPool(poolSize, type, mapper);
+    }
+
+    private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
         if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
             this.gryoReaders = new LinkedBlockingQueue<>(poolSize);
             for (int i = 0; i < poolSize; i++) {
-                this.gryoReaders.add(GryoReader.build().mapper(mapper).create());
+                this.gryoReaders.add(GryoReader.build().mapper(gryoMapper).create());
             }
         }
         if (type.equals(Type.WRITER) || type.equals(Type.READER_WRITER)) {
             this.gryoWriters = new LinkedBlockingQueue<>(poolSize);
             for (int i = 0; i < poolSize; i++) {
-                this.gryoWriters.add(GryoWriter.build().mapper(mapper).create());
+                this.gryoWriters.add(GryoWriter.build().mapper(gryoMapper).create());
             }
         }
     }
 
+    public GryoMapper getMapper() {
+        return this.mapper;
+    }
+
     public GryoReader takeReader() {
         final GryoReader reader = this.gryoReaders.poll();
         return null == reader ? GryoReader.build().mapper(mapper).create() : reader;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
index 234a04e..ebc0ebc 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoReader.java
@@ -18,24 +18,23 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
 import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.Host;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 
 import java.io.IOException;
@@ -68,12 +67,16 @@ public final class GryoReader implements GraphReader {
         this.batchSize = batchSize;
     }
 
+    public Kryo getKryo() {
+        return this.kryo;
+    }
+
     /**
      * Read data into a {@link Graph} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
-     *                    {@link GraphWriter#writeGraph(OutputStream, Graph)}.
+     * @param inputStream    a stream containing an entire graph of vertices and edges as defined by the accompanying
+     *                       {@link GraphWriter#writeGraph(OutputStream, Graph)}.
      * @param graphToWriteTo the graph to write to when reading from the stream.
      * @throws IOException
      */
@@ -81,7 +84,7 @@ public final class GryoReader implements GraphReader {
     public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
         // dual pass - create all vertices and store to cache the ids.  then create edges.  as long as we don't
         // have vertex labels in the output we can't do this single pass
-        final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
+        final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>();
         final AtomicLong counter = new AtomicLong(0);
 
         final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();
@@ -112,11 +115,11 @@ public final class GryoReader implements GraphReader {
      * Read {@link Vertex} objects from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
-     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
-     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
+     * @param inputStream                a stream containing at least one {@link Vertex} as defined by the accompanying
+     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
+     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
+     * @param vertexAttachMethod         a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
+     * @param edgeAttachMethod           a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
      */
     @Override
@@ -131,8 +134,8 @@ public final class GryoReader implements GraphReader {
      * Read a {@link Vertex}  from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing at least a single vertex as defined by the accompanying
-     *                    {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
+     * @param inputStream        a stream containing at least a single vertex as defined by the accompanying
+     *                           {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
      * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
      */
     @Override
@@ -144,10 +147,10 @@ public final class GryoReader implements GraphReader {
      * Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
      * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
      *
-     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
-     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
-     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
-     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
+     * @param inputStream                a stream containing at least one {@link Vertex} as defined by the accompanying
+     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
+     * @param vertexAttachMethod         a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
+     * @param edgeAttachMethod           a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
      */
     @Override
@@ -163,8 +166,8 @@ public final class GryoReader implements GraphReader {
      * Read an {@link Edge} from output generated by {@link GryoWriter#writeEdge(OutputStream, Edge)} or via
      * an {@link Edge} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
      *
-     * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
-     *                    {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
+     * @param inputStream      a stream containing at least one {@link Edge} as defined by the accompanying
+     *                         {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
      * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
      */
     @Override
@@ -180,14 +183,14 @@ public final class GryoReader implements GraphReader {
      * {@link GryoWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
      * to {@link GryoWriter#writeObject(OutputStream, Object)}.
      *
-     * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
-     *                    {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
+     * @param inputStream                a stream containing at least one {@link VertexProperty} as written by the accompanying
+     *                                   {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
      * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
      *                                   {@link Host} object.
      */
     @Override
-    public VertexProperty readVertexProperty (final InputStream inputStream,
-                                              final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
+    public VertexProperty readVertexProperty(final InputStream inputStream,
+                                             final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
         final Input input = new Input(inputStream);
         readHeader(input);
         final Attachable<VertexProperty> attachable = kryo.readObject(input, DetachedVertexProperty.class);
@@ -198,8 +201,8 @@ public final class GryoReader implements GraphReader {
      * Read a {@link Property} from output generated by  {@link GryoWriter#writeProperty(OutputStream, Property)} or
      * via an {@link Property} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
      *
-     * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
-     *                    {@link GraphWriter#writeProperty(OutputStream, Property)} method.
+     * @param inputStream          a stream containing at least one {@link Property} as written by the accompanying
+     *                             {@link GraphWriter#writeProperty(OutputStream, Property)} method.
      * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
      */
     @Override
@@ -215,7 +218,7 @@ public final class GryoReader implements GraphReader {
      * {@inheritDoc}
      */
     @Override
-    public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException{
+    public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
         return clazz.cast(this.kryo.readClassAndObject(new Input(inputStream)));
     }
 
@@ -230,7 +233,8 @@ public final class GryoReader implements GraphReader {
         kryo.readClassAndObject(input);
 
         final Vertex v = vertexMaker.apply(starGraph.getStarVertex());
-        if (edgeMaker != null) starGraph.getStarVertex().edges(d).forEachRemaining(e -> edgeMaker.apply((Attachable<Edge>) e));
+        if (edgeMaker != null)
+            starGraph.getStarVertex().edges(d).forEachRemaining(e -> edgeMaker.apply((Attachable<Edge>) e));
         return v;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
index d98b8c2..8ca8e51 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoWriter.java
@@ -51,6 +51,10 @@ public final class GryoWriter implements GraphWriter {
         this.kryo = gryoMapper.createMapper();
     }
 
+    public Kryo getKryo() {
+        return this.kryo;
+    }
+
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
index 3bf232c..616daa2 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountTest.java
@@ -84,7 +84,6 @@ public abstract class GroupCountTest extends AbstractGremlinProcessTest {
 
     @Test
     @LoadGraphWith(MODERN)
-    @Ignore // TODO: fix Spark integration
     public void g_V_outXcreatedX_groupCountXxX_capXxX() {
         final Traversal<Vertex, Map<Vertex, Long>> traversal = get_g_V_outXcreatedX_groupCountXxX_capXxX();
         final Object lopId = convertToVertexId("lop");

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/hadoop-gremlin/conf/hadoop-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-gryo.properties b/hadoop-gremlin/conf/hadoop-gryo.properties
index e92d517..75fb11f 100644
--- a/hadoop-gremlin/conf/hadoop-gryo.properties
+++ b/hadoop-gremlin/conf/hadoop-gryo.properties
@@ -27,7 +27,7 @@ gremlin.hadoop.outputLocation=output
 ####################################
 spark.master=local[4]
 spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
 # spark.kryo.registrationRequired=true
 # spark.storage.memoryFraction=0.2
 # spark.eventLog.enabled=true

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/hadoop-gremlin/conf/hadoop-script.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-script.properties b/hadoop-gremlin/conf/hadoop-script.properties
index 1099394..3b41598 100644
--- a/hadoop-gremlin/conf/hadoop-script.properties
+++ b/hadoop-gremlin/conf/hadoop-script.properties
@@ -28,7 +28,7 @@ gremlin.hadoop.outputLocation=output
 ####################################
 spark.master=local[4]
 spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
 # spark.kryo.registrationRequired=true
 # spark.storage.memoryFraction=0.2
 # spark.eventLog.enabled=true

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
new file mode 100644
index 0000000..6c6296d
--- /dev/null
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.groovy
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo
+
+import org.apache.spark.serializer.DeserializationStream
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader
+import org.apache.tinkerpop.shaded.kryo.KryoException
+import org.apache.tinkerpop.shaded.kryo.io.Input
+import scala.reflect.ClassTag
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoDeserializationStream extends DeserializationStream {
+
+    private final Input input;
+    private final GryoSerializerInstance gryoSerializer;
+
+    public GryoDeserializationStream(final GryoSerializerInstance gryoSerializer, final InputStream inputStream) {
+        this.gryoSerializer = gryoSerializer;
+        this.input = new Input(inputStream);
+    }
+
+    @Override
+    public <T> T readObject(final ClassTag<T> classTag) {
+        try {
+            final GryoReader gryoReader = this.gryoSerializer.getGryoPool().takeReader();
+            final T t = (T) gryoReader.getKryo().readClassAndObject(this.input);
+            this.gryoSerializer.getGryoPool().offerReader(gryoReader);
+            return t;
+        } catch (final Throwable e) {
+            if (e instanceof KryoException) {
+                final KryoException kryoException = (KryoException) e;
+                if (kryoException.getMessage().toLowerCase().contains("buffer underflow")) {
+                    throw new EOFException();
+                }
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void close() {
+        this.input.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
index 19d79a8..291fcd3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
@@ -33,7 +33,7 @@ public interface InputRDD {
 
     /**
      * Read the graphRDD from the underlying graph system.
-     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
+     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
      * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
      * @return an adjacency list representation of the underlying graph system.
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
deleted file mode 100644
index 1d8039a..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoDeserializationStream.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.DeserializationStream;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import scala.reflect.ClassTag;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoDeserializationStream extends DeserializationStream {
-
-    private final InputStream inputStream;
-    private final GryoSerializerInstance serializer;
-
-    public GryoDeserializationStream(final GryoSerializerInstance serializer, final InputStream inputStream) {
-        this.serializer = serializer;
-        this.inputStream = inputStream;
-    }
-
-    @Override
-    public <T> T readObject(final ClassTag<T> classTag) {
-        return (T) this.serializer.getKryo().readClassAndObject(new Input(this.inputStream));
-    }
-
-    @Override
-    public void close() {
-        try {
-            this.inputStream.close();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
deleted file mode 100644
index f51444b..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializationStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.SerializationStream;
-import scala.reflect.ClassTag;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializationStream extends SerializationStream {
-
-    private final OutputStream outputStream;
-    private final GryoSerializerInstance serializer;
-
-    public GryoSerializationStream(final GryoSerializerInstance serializer, final OutputStream outputStream) {
-        this.outputStream = outputStream;
-        this.serializer = serializer;
-    }
-
-    @Override
-    public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
-        try {
-            this.outputStream.write(this.serializer.serialize(t, classTag).array());
-            this.outputStream.flush();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-        return this;
-    }
-
-    @Override
-    public void flush() {
-        try {
-            this.outputStream.flush();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            this.outputStream.close();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
deleted file mode 100644
index c2e99a9..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.SerializableWritable;
-import org.apache.spark.api.python.PythonBroadcast;
-import org.apache.spark.broadcast.HttpBroadcast;
-import org.apache.spark.scheduler.CompressedMapStatus;
-import org.apache.spark.serializer.Serializer;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.spark.util.SerializableConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
-import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializer extends Serializer {
-    @Override
-    public SerializerInstance newInstance() {
-        return new GryoSerializerInstance(
-                GryoMapper.build().
-                        addCustom(SerializableWritable.class, new JavaSerializer()).
-                        addCustom(Tuple2.class, new JavaSerializer()).
-                        addCustom(CompressedMapStatus.class, new JavaSerializer()).
-                        addCustom(HttpBroadcast.class, new JavaSerializer()).
-                        addCustom(PythonBroadcast.class, new JavaSerializer()).
-                        addCustom(MessagePayload.class, new JavaSerializer()).
-                        addCustom(ViewIncomingPayload.class, new JavaSerializer()).
-                        addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
-                        addCustom(ViewPayload.class, new JavaSerializer()).
-                        addCustom(SerializableConfiguration.class, new JavaSerializer()).
-                        addCustom(VertexWritable.class, new JavaSerializer()).
-                        addCustom(ObjectWritable.class, new JavaSerializer()).
-                        create().createMapper());
-        // kryo.register(org.apache.spark.serializer.JavaIterableWrapperSerializer..MODULE$.wrapperClass(), new JavaIterableWrapperSerializer());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
deleted file mode 100644
index 9c74434..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/gryo/GryoSerializerInstance.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo;
-
-import org.apache.spark.serializer.DeserializationStream;
-import org.apache.spark.serializer.SerializationStream;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
-import scala.reflect.ClassTag;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GryoSerializerInstance extends SerializerInstance {
-
-    private final Kryo kryo;
-
-    public GryoSerializerInstance(final Kryo kryo) {
-        this.kryo = kryo;
-    }
-
-    @Override
-    public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
-        final Output output = new Output(100000);
-        this.kryo.writeClassAndObject(output, t);
-        output.flush();
-        return ByteBuffer.wrap(output.getBuffer());
-    }
-
-    @Override
-    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
-        return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
-    }
-
-    @Override
-    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
-        this.kryo.setClassLoader(classLoader);
-        return (T) this.kryo.readClassAndObject(new Input(byteBuffer.array()));
-    }
-
-    @Override
-    public SerializationStream serializeStream(final OutputStream outputStream) {
-        return new GryoSerializationStream(this, outputStream);
-    }
-
-    @Override
-    public DeserializationStream deserializeStream(final InputStream inputStream) {
-        return new GryoDeserializationStream(this, inputStream);
-    }
-
-    public Kryo getKryo() {
-        return this.kryo;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
new file mode 100644
index 0000000..6c1a164
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializationStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.reflect.ClassTag;
+
+import java.io.OutputStream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializationStream extends SerializationStream {
+
+    private final Output output;
+    private final GryoSerializerInstance gryoSerializer;
+
+    public GryoSerializationStream(final GryoSerializerInstance gryoSerializer, final OutputStream outputStream) {
+        this.output = new Output(outputStream);
+        this.gryoSerializer = gryoSerializer;
+    }
+
+    @Override
+    public <T> SerializationStream writeObject(final T t, final ClassTag<T> classTag) {
+        final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
+        writer.getKryo().writeClassAndObject(this.output, t);
+        this.gryoSerializer.getGryoPool().offerWriter(writer);
+        return this;
+    }
+
+    @Override
+    public void flush() {
+        this.output.flush();
+    }
+
+    @Override
+    public void close() {
+        this.output.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
new file mode 100644
index 0000000..ee16126
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.python.PythonBroadcast;
+import org.apache.spark.broadcast.HttpBroadcast;
+import org.apache.spark.network.util.ByteUnit;
+import org.apache.spark.scheduler.CompressedMapStatus;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.util.SerializableConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializer extends Serializer {
+    private final boolean referenceTracking;
+    private final boolean registrationRequired;
+    //private final Option<String> userRegistrator;
+    private final long bufferSizeKb;
+    private final int bufferSize;
+    private final int maxBufferSizeMb;
+    private final int maxBufferSize;
+
+    private final GryoPool gryoPool;
+
+    public GryoSerializer(final SparkConf sparkConfiguration) {
+        this.bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
+        if (this.bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
+            throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + this.bufferSizeKb + " mb.");
+        } else {
+            this.bufferSize = (int) ByteUnit.KiB.toBytes(this.bufferSizeKb);
+            this.maxBufferSizeMb = (int) sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
+            if (this.maxBufferSizeMb >= ByteUnit.GiB.toMiB(2L)) {
+                throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than 2048 mb, got: " + this.maxBufferSizeMb + " mb.");
+            } else {
+                this.maxBufferSize = (int) ByteUnit.MiB.toBytes(this.maxBufferSizeMb);
+                this.referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+                this.registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+                //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
+
+            }
+        }
+        this.gryoPool = new GryoPool(makeApacheConfiguration(sparkConfiguration), builder -> {
+            try {
+                builder.
+                        addCustom(SerializableWritable.class, new JavaSerializer()).
+                        addCustom(Tuple2.class, new JavaSerializer()).
+                        addCustom(CompressedMapStatus.class, new JavaSerializer()).
+                        addCustom(HttpBroadcast.class, new JavaSerializer()).
+                        addCustom(PythonBroadcast.class, new JavaSerializer()).
+                        addCustom(BoxedUnit.class, new JavaSerializer()).
+                        addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()).
+                        addCustom(MessagePayload.class, new JavaSerializer()).
+                        addCustom(ViewIncomingPayload.class, new JavaSerializer()).
+                        addCustom(ViewOutgoingPayload.class, new JavaSerializer()).
+                        addCustom(ViewPayload.class, new JavaSerializer()).
+                        addCustom(SerializableConfiguration.class, new JavaSerializer()).
+                        addCustom(VertexWritable.class, new JavaSerializer()).
+                        addCustom(ObjectWritable.class, new JavaSerializer());
+            } catch (final ClassNotFoundException e) {
+                throw new IllegalStateException(e);
+            }
+        }, kryo -> {
+            kryo.setRegistrationRequired(this.registrationRequired);
+            kryo.setReferences(this.referenceTracking);
+        });
+
+    }
+
+    public Output newOutput() {
+        return new Output(this.bufferSize, this.maxBufferSize);
+    }
+
+    public GryoPool getGryoPool() {
+        return this.gryoPool;
+    }
+
+    @Override
+    public SerializerInstance newInstance() {
+        return new GryoSerializerInstance(this);
+    }
+
+    private static Configuration makeApacheConfiguration(final SparkConf sparkConfiguration) {
+        final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+        apacheConfiguration.setDelimiterParsingDisabled(true);
+        for (final Tuple2<String, String> tuple : sparkConfiguration.getAll()) {
+            apacheConfiguration.setProperty(tuple._1(), tuple._2());
+        }
+        return apacheConfiguration;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
new file mode 100644
index 0000000..764d8f0
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerInstance.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.reflect.ClassTag;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializerInstance extends SerializerInstance {
+
+    private final GryoSerializer gryoSerializer;
+    private final Output output;
+    private final Input input;
+
+    public GryoSerializerInstance(final GryoSerializer gryoSerializer) {
+        this.gryoSerializer = gryoSerializer;
+        this.input = new Input();
+        this.output = gryoSerializer.newOutput();
+    }
+
+    @Override
+    public <T> ByteBuffer serialize(final T t, final ClassTag<T> classTag) {
+        final GryoWriter writer = this.gryoSerializer.getGryoPool().takeWriter();
+        writer.getKryo().writeClassAndObject(this.output, t);
+        this.output.flush();
+        this.gryoSerializer.getGryoPool().offerWriter(writer);
+        return ByteBuffer.wrap(this.output.getBuffer());
+    }
+
+    @Override
+    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassTag<T> classTag) {
+        this.input.setBuffer(byteBuffer.array());
+        final GryoReader reader = this.gryoSerializer.getGryoPool().takeReader();
+        final T t = (T) reader.getKryo().readClassAndObject(this.input);
+        this.gryoSerializer.getGryoPool().offerReader(reader);
+        return t;
+    }
+
+    @Override
+    public <T> T deserialize(final ByteBuffer byteBuffer, final ClassLoader classLoader, final ClassTag<T> classTag) {
+        return this.deserialize(byteBuffer, classTag);
+    }
+
+    @Override
+    public SerializationStream serializeStream(final OutputStream outputStream) {
+        return new GryoSerializationStream(this, outputStream);
+    }
+
+    @Override
+    public DeserializationStream deserializeStream(final InputStream inputStream) {
+        return new GryoDeserializationStream(this, inputStream);
+    }
+
+    public GryoPool getGryoPool() {
+        return this.gryoSerializer.getGryoPool();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fde13ed5/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
index 7ed741b..916a3d7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -115,7 +115,7 @@ public final class HadoopSparkGraphProvider extends AbstractGraphProvider {
             /// spark configuration
             put("spark.master", "local[4]");
             // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-            put("spark.serializer","org.apache.tinkerpop.gremlin.spark.process.computer.io.gryo.GryoSerializer");
+            put("spark.serializer","org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
             // put("spark.kryo.registrationRequired",true);
         }};
     }


[07/14] incubator-tinkerpop git commit: GryoPool now completely uses a GryoPool.Builder. Building a GryoPool is alot more elegant and consistent now. Updated GryoPoolTest accordingly.

Posted by ok...@apache.org.
GryoPool now completely uses a GryoPool.Builder. Building a GryoPool is alot more elegant and consistent now. Updated GryoPoolTest accordingly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/acdebc43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/acdebc43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/acdebc43

Branch: refs/heads/master
Commit: acdebc43ae07b3f6c808b34e6aa04c032ad58cbd
Parents: 802cf6a
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 16:24:39 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 16:24:39 2015 -0600

----------------------------------------------------------------------
 .../apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/acdebc43/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
index a3ac294..9db1ba4 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
@@ -62,8 +62,7 @@ public class GryoPoolTest {
     @Test
     public void shouldConfigPoolOnConstructionWithPoolSizeOneAndNoIoRegistry() throws Exception {
         final Configuration conf = new BaseConfiguration();
-        conf.setProperty(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 1);
-        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
+        final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         final GryoReader reader = pool.takeReader();
         final GryoWriter writer = pool.takeWriter();
 


[08/14] incubator-tinkerpop git commit: updated asciidocs to have spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer in the example properties file. Updated CHANGELOG.

Posted by ok...@apache.org.
updated asciidocs to have spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer in the example properties file. Updated CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0ca73636
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0ca73636
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0ca73636

Branch: refs/heads/master
Commit: 0ca736369b0fd8f51a3b4b0681d7cec1c31cff67
Parents: acdebc4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 16:36:51 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 16:36:51 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                | 2 ++
 docs/src/implementations.asciidoc | 2 +-
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ca73636/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index cb6dcbf..4914b1a 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,8 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Added `GryoSerializer` which is a Spark `Serializer` and is the recommend serializer. Handles `Graph` and `GryoMapper` registries.
+* `GryoPool` now makes use of `GryoPool.Builder` for its construction.
 * Bumped to Apache Hadoop 2.7.1.
 * Bumped to Apache Giraph 1.1.0.
 * Bumped to Apache Spark 1.5.0.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0ca73636/docs/src/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/implementations.asciidoc b/docs/src/implementations.asciidoc
index 3d1f0ce..3598acb 100644
--- a/docs/src/implementations.asciidoc
+++ b/docs/src/implementations.asciidoc
@@ -780,7 +780,7 @@ gremlin.hadoop.jarsInDistributedCache=true
 ####################################
 spark.master=local[4]
 spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
 #####################################
 # GiraphGraphComputer Configuration #
 #####################################


[06/14] incubator-tinkerpop git commit: GryoPool now completely uses a GryoPool.Builder. Building a GryoPool is alot more elegant and consistent now. Updated GryoPoolTest accordingly.

Posted by ok...@apache.org.
GryoPool now completely uses a GryoPool.Builder. Building a GryoPool is alot more elegant and consistent now. Updated GryoPoolTest accordingly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/802cf6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/802cf6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/802cf6a2

Branch: refs/heads/master
Commit: 802cf6a2dac728d2b561c88a7c5f591c7b82591e
Parents: b32263e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 16:22:41 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 16:22:41 2015 -0600

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoPool.java     | 151 ++++++++++---------
 .../gremlin/structure/io/gryo/GryoPoolTest.java |  18 +--
 .../hadoop/structure/io/HadoopPools.java        |  14 +-
 .../spark/structure/io/gryo/GryoSerializer.java |   4 +-
 4 files changed, 100 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 97884bb..0b6faab 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
-import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 
@@ -46,7 +45,7 @@ public final class GryoPool {
 
     private Queue<GryoReader> gryoReaders;
     private Queue<GryoWriter> gryoWriters;
-    private final GryoMapper mapper;
+    private GryoMapper mapper;
 
     public static GryoPool.Builder build() {
         return new GryoPool.Builder();
@@ -56,48 +55,10 @@ public final class GryoPool {
      * Used by {@code GryoPool.Builder}.
      */
     private GryoPool() {
-        this.mapper = null;
-    }
-
-    /**
-     * Create a pool of readers and writers from a {@code Configuration} object.  There are two configuration keys
-     * expected: "gremlin.io.registry" which defines comma separated list of the fully qualified class names of
-     * {@link IoRegistry} implementations to use and the "gremlin.io.gryo.poolSize" which defines the initial size
-     * of the {@code GryoPool}.  As with usage of {@link GryoMapper.Builder#addRegistry(IoRegistry)}, the order in
-     * which these items are added matters greatly.  The order used for writing should be the order used for reading.
-     */
-    public GryoPool(final Configuration conf) {
-        this(conf.getInt(CONFIG_IO_GRYO_POOL_SIZE, 256), Type.READER_WRITER,
-                tryCreateIoRegistry(conf.getList(CONFIG_IO_REGISTRY, Collections.emptyList())));
-    }
-
-    /**
-     * Create a pool of readers and writers of specified size and use the default {@link GryoMapper} (which means
-     * that custom serializers from vendors will not be applied.
-     *
-     * @param poolSize initial size of the pool.
-     */
-    public GryoPool(final int poolSize) {
-        this(poolSize, Type.READER_WRITER, Collections.emptyList());
-    }
-
-    /**
-     * Create a pool of a readers, writers or both of the specified size with an optional {@link IoRegistry} object
-     * which would allow custom serializers to be registered to the pool.
-     *
-     * @param poolSize   initial size of the pool.
-     * @param type       the type of pool.
-     * @param registries a list of registries to assign to each {@link GryoReader} and {@link GryoWriter} instances.
-     */
-    public GryoPool(final int poolSize, final Type type, final List<IoRegistry> registries) {
-        final GryoMapper.Builder mapperBuilder = GryoMapper.build();
-        registries.forEach(mapperBuilder::addRegistry);
-        // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
-        this.mapper = mapperBuilder.create();
-        createPool(poolSize, type, mapper);
     }
 
     private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
+        this.mapper = gryoMapper;
         if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
             this.gryoReaders = new LinkedBlockingQueue<>(poolSize);
             for (int i = 0; i < poolSize; i++) {
@@ -143,71 +104,92 @@ public final class GryoPool {
         this.offerWriter(gryoWriter);
     }
 
-    private static List<IoRegistry> tryCreateIoRegistry(final List<Object> classNames) {
-        if (classNames.isEmpty()) return Collections.emptyList();
-
-        final List<IoRegistry> registries = new ArrayList<>();
-        classNames.forEach(c -> {
-            try {
-                final String className = c.toString();
-                final Class<?> clazz = Class.forName(className);
-                try {
-                    final Method instanceMethod = clazz.getDeclaredMethod("getInstance");
-                    if (IoRegistry.class.isAssignableFrom(instanceMethod.getReturnType()))
-                        registries.add((IoRegistry) instanceMethod.invoke(null));
-                    else
-                        throw new Exception();
-                } catch (Exception methodex) {
-                    // tried getInstance() and that failed so try newInstance() no-arg constructor
-                    registries.add((IoRegistry) clazz.newInstance());
-                }
-            } catch (Exception ex) {
-                throw new IllegalStateException(ex);
-            }
-        });
-        return registries;
-    }
-
     ////
 
     public static class Builder {
 
         private int poolSize = 256;
+        private List<IoRegistry> ioRegistries = new ArrayList<>();
         private Type type = Type.READER_WRITER;
         private Consumer<GryoMapper.Builder> gryoMapperConsumer = null;
         private Consumer<Kryo> kryoConsumer = null;
-        private Configuration configuration = null;
 
-        public Builder configuration(final Configuration configuration) {
-            this.configuration = configuration;
+        /**
+         * The {@code IoRegistry} class names to use for the {@code GryoPool}
+         *
+         * @param ioRegistryClassNames a list of class names
+         * @return the update builder
+         */
+        public Builder ioRegistries(final List<Object> ioRegistryClassNames) {
+            this.ioRegistries.addAll(tryCreateIoRegistry(ioRegistryClassNames));
+            return this;
+        }
+
+        /**
+         * The {@code IoRegistry} class name to use for the {@code GryoPool}
+         *
+         * @param ioRegistryClassName a class name
+         * @return the update builder
+         */
+        public Builder ioRegistry(final Object ioRegistryClassName) {
+            this.ioRegistries.addAll(tryCreateIoRegistry(Collections.singletonList(ioRegistryClassName)));
             return this;
         }
 
+        /**
+         * The size of the {@code GryoPool}. The size can not be changed once created.
+         *
+         * @param poolSize the pool size
+         * @return the updated builder
+         */
         public Builder poolSize(int poolSize) {
             this.poolSize = poolSize;
             return this;
         }
 
+        /**
+         * The type of {@code GryoPool} to support -- see {@code Type}
+         *
+         * @param type the pool type
+         * @return the updated builder
+         */
         public Builder type(final Type type) {
             this.type = type;
             return this;
         }
 
+        /**
+         * A consumer to update the {@code GryoMapper.Builder} once constructed.
+         *
+         * @param gryoMapperConsumer the {@code GryoMapper.Builder} consumer
+         * @return the updated builder
+         */
         public Builder initializeMapper(final Consumer<GryoMapper.Builder> gryoMapperConsumer) {
             this.gryoMapperConsumer = gryoMapperConsumer;
             return this;
         }
 
+        /**
+         * A consumer to update all the {@link Kryo} instances for all {@code GryoReader} and {@code GryoWriter} instances.
+         *
+         * @param kryoConsumer the consumer
+         * @return the updated builder
+         */
         public Builder initializeKryo(final Consumer<Kryo> kryoConsumer) {
             this.kryoConsumer = kryoConsumer;
             return this;
         }
 
+        /**
+         * Create the {@code GryoPool} from this builder.
+         *
+         * @return the new pool
+         */
         public GryoPool create() {
             final GryoMapper.Builder mapper = GryoMapper.build();
             final GryoPool gryoPool = new GryoPool();
-            if (null != this.configuration)
-                tryCreateIoRegistry(this.configuration.getList(CONFIG_IO_REGISTRY, Collections.emptyList())).forEach(mapper::addRegistry);
+            if (null != this.ioRegistries)
+                this.ioRegistries.forEach(mapper::addRegistry);
             if (null != this.gryoMapperConsumer)
                 this.gryoMapperConsumer.accept(mapper);
             gryoPool.createPool(this.poolSize, this.type, mapper.create());
@@ -221,5 +203,32 @@ public final class GryoPool {
             }
             return gryoPool;
         }
+
+        /////
+
+        private static List<IoRegistry> tryCreateIoRegistry(final List<Object> classNames) {
+            if (classNames.isEmpty()) return Collections.emptyList();
+
+            final List<IoRegistry> registries = new ArrayList<>();
+            classNames.forEach(c -> {
+                try {
+                    final String className = c.toString();
+                    final Class<?> clazz = Class.forName(className);
+                    try {
+                        final Method instanceMethod = clazz.getDeclaredMethod("getInstance");
+                        if (IoRegistry.class.isAssignableFrom(instanceMethod.getReturnType()))
+                            registries.add((IoRegistry) instanceMethod.invoke(null));
+                        else
+                            throw new Exception();
+                    } catch (Exception methodex) {
+                        // tried getInstance() and that failed so try newInstance() no-arg constructor
+                        registries.add((IoRegistry) clazz.newInstance());
+                    }
+                } catch (Exception ex) {
+                    throw new IllegalStateException(ex);
+                }
+            });
+            return registries;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
index ea4bac7..a3ac294 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry;
 import org.apache.tinkerpop.gremlin.structure.io.IoX;
 import org.apache.tinkerpop.gremlin.structure.io.IoXIoRegistry;
 import org.apache.tinkerpop.gremlin.structure.io.IoY;
@@ -30,6 +29,7 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 
@@ -41,7 +41,7 @@ public class GryoPoolTest {
     @Test
     public void shouldDoWithReaderWriterMethods() throws Exception {
         final Configuration conf = new BaseConfiguration();
-        final GryoPool pool = new GryoPool(conf);
+        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) {
             pool.doWithWriter(writer -> writer.writeObject(os, 1));
             os.flush();
@@ -55,7 +55,7 @@ public class GryoPoolTest {
     @Test
     public void shouldConfigPoolOnConstructionWithDefaults() throws Exception {
         final Configuration conf = new BaseConfiguration();
-        final GryoPool pool = new GryoPool(conf);
+        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         assertReaderWriter(pool.takeWriter(), pool.takeReader(), 1, Integer.class);
     }
 
@@ -63,7 +63,7 @@ public class GryoPoolTest {
     public void shouldConfigPoolOnConstructionWithPoolSizeOneAndNoIoRegistry() throws Exception {
         final Configuration conf = new BaseConfiguration();
         conf.setProperty(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 1);
-        final GryoPool pool = new GryoPool(conf);
+        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         final GryoReader reader = pool.takeReader();
         final GryoWriter writer = pool.takeWriter();
 
@@ -88,7 +88,7 @@ public class GryoPoolTest {
     public void shouldConfigPoolOnConstructionWithCustomIoRegistryConstructor() throws Exception {
         final Configuration conf = new BaseConfiguration();
         conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, IoXIoRegistry.ConstructorBased.class.getName());
-        final GryoPool pool = new GryoPool(conf);
+        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
     }
 
@@ -96,7 +96,7 @@ public class GryoPoolTest {
     public void shouldConfigPoolOnConstructionWithCustomIoRegistryInstance() throws Exception {
         final Configuration conf = new BaseConfiguration();
         conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, IoXIoRegistry.InstanceBased.class.getName());
-        final GryoPool pool = new GryoPool(conf);
+        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
     }
 
@@ -105,7 +105,7 @@ public class GryoPoolTest {
         final Configuration conf = new BaseConfiguration();
         conf.setProperty(GryoPool.CONFIG_IO_REGISTRY,
                 IoXIoRegistry.InstanceBased.class.getName() + "," + IoYIoRegistry.InstanceBased.class.getName());
-        final GryoPool pool = new GryoPool(conf);
+        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
         assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoY(100, 200), IoY.class);
     }
@@ -113,7 +113,7 @@ public class GryoPoolTest {
     @Test(expected = IllegalArgumentException.class)
     public void shouldConfigPoolOnConstructionWithoutCustomIoRegistryAndFail() throws Exception {
         final Configuration conf = new BaseConfiguration();
-        final GryoPool pool = new GryoPool(conf);
+        final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
         assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
     }
 
@@ -121,7 +121,7 @@ public class GryoPoolTest {
     public void shouldConfigPoolOnConstructionWithoutBadIoRegistryAndFail() throws Exception {
         final Configuration conf = new BaseConfiguration();
         conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, "some.class.that.does.not.exist");
-        new GryoPool(conf);
+        GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
     }
 
     private static <T> void assertReaderWriter(final GryoWriter writer, final GryoReader reader, final T o,

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index b714e2e..9ece0ae 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -23,6 +23,8 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 
+import java.util.Collections;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -31,21 +33,21 @@ public final class HadoopPools {
     private HadoopPools() {
     }
 
-    private static GryoPool GRYO_POOL = new GryoPool(256);
+    private static GryoPool GRYO_POOL = GryoPool.build().create();
     private static boolean INITIALIZED = false;
 
     public synchronized static void initialize(final Configuration configuration) {
         if (!INITIALIZED) {
             INITIALIZED = true;
-            GRYO_POOL = new GryoPool(configuration);
+            GRYO_POOL = GryoPool.build().
+                    poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
+                    ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+                    create();
         }
     }
 
     public synchronized static void initialize(final org.apache.hadoop.conf.Configuration configuration) {
-        if (!INITIALIZED) {
-            INITIALIZED = true;
-            GRYO_POOL = new GryoPool(ConfUtil.makeApacheConfiguration(configuration));
-        }
+        HadoopPools.initialize(ConfUtil.makeApacheConfiguration(configuration));
     }
 
     public static GryoPool getGryoPool() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 7b12807..7c409ef 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -43,6 +43,8 @@ import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
 import scala.Tuple2;
 import scala.runtime.BoxedUnit;
 
+import java.util.Collections;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -72,7 +74,7 @@ public final class GryoSerializer extends Serializer {
         }
         this.gryoPool = GryoPool.build().
                 poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
-                configuration(makeApacheConfiguration(sparkConfiguration)).
+                ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {
                         builder.


[09/14] incubator-tinkerpop git commit: remvoved uneeded comment about Spark 1.3 and registering classes. With GryoSerializer, this is all handled now.

Posted by ok...@apache.org.
remvoved uneeded comment about Spark 1.3 and registering classes. With GryoSerializer, this is all handled now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b4a009c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b4a009c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b4a009c7

Branch: refs/heads/master
Commit: b4a009c799527386be94c669be9dce9064845f85
Parents: 0ca7363
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 16:55:14 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 16:55:14 2015 -0600

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/SparkGraphComputer.java   | 8 --------
 1 file changed, 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4a009c7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index be5c19e..99a07ba 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -97,14 +97,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             // wire up a spark context
             final SparkConf sparkConfiguration = new SparkConf();
             sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
-                    /*final List<Class> classes = new ArrayList<>();
-                    classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
-                    classes.addAll(IOClasses.getSharedHadoopClasses());
-                    classes.add(ViewPayload.class);
-                    classes.add(MessagePayload.class);
-                    classes.add(ViewIncomingPayload.class);
-                    classes.add(ViewOutgoingPayload.class);
-                    sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
 
             // create the spark configuration from the graph computer configuration
             hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));