You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/26 20:21:49 UTC
tinkerpop git commit: lots of good stuff here -- finally have testing
of IORegistry in SparkGraphComputer. Have ToyPoint and TestIoRegistry.
Realized a bunch of stupid .flush() calls in the GryoSerializer serializers
(Spark)... perhaps that is why things
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1389 a97ba5745 -> 77fb25fff
lots of good stuff here -- finally have testing of IORegistry in SparkGraphComputer. Have ToyPoint and TestIoRegistry. Realized a bunch of stupid .flush() calls in the GryoSerializer serializers (Spark)... perhaps that is why things are slower than KryoSerializer. Really cleaned up IoRegistryAwareKryoSerializer. Added getShim() methods to the Shaded/UnshadedSerializerAdaptors. I'm a stud. cc/ @dalaro
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/77fb25ff
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/77fb25ff
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/77fb25ff
Branch: refs/heads/TINKERPOP-1389
Commit: 77fb25fff97ce3fc3c65fb253e1b851b7956aaaf
Parents: a97ba57
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Oct 26 14:21:42 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Oct 26 14:21:42 2016 -0600
----------------------------------------------------------------------
.../io/gryo/kryoshim/KryoShimServiceLoader.java | 2 +-
.../shaded/ShadedSerializerAdapter.java | 6 +-
.../gremlin/hadoop/HadoopGraphProvider.java | 1 -
.../io/gryo/CompactBufferSerializer.groovy | 2 -
.../io/gryo/IoRegistryAwareKryoSerializer.java | 39 +++----
.../io/gryo/ObjectWritableSerializer.java | 1 -
.../structure/io/gryo/Tuple2Serializer.java | 2 -
.../structure/io/gryo/Tuple3Serializer.java | 3 -
.../io/gryo/VertexWritableSerializer.java | 1 -
.../io/gryo/WrappedArraySerializer.java | 1 -
.../unshaded/UnshadedSerializerAdapter.java | 18 ++--
.../gremlin/spark/AbstractSparkTest.java | 7 +-
.../structure/io/gryo/GryoIoRegistryTest.java | 101 +++++++++++++++++++
.../spark/structure/io/gryo/TestIoRegistry.java | 40 ++++++++
.../spark/structure/io/gryo/ToyPoint.java | 75 ++++++++++++++
15 files changed, 255 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 5f50f9e..9287b10 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -114,7 +114,7 @@ public class KryoShimServiceLoader {
throw new IllegalStateException("Unable to load KryoShimService");
// once the shim service is defined, configure it
- log.info("Configuring KryoShimService {} with following configuration:\n####################\n{}\n####################",
+ log.info("Configuring KryoShimService {} with following configuration:\n#######START########\n{}\n########END#########",
cachedShimService.getClass().getCanonicalName(),
ConfigurationUtils.toString(configuration));
cachedShimService.applyConfiguration(configuration);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
index 28a44bd..fca19c7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java
@@ -26,7 +26,7 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
public class ShadedSerializerAdapter<T> extends Serializer<T> {
- SerializerShim<T> serializer;
+ private final SerializerShim<T> serializer;
public ShadedSerializerAdapter(final SerializerShim<T> serializer) {
this.serializer = serializer;
@@ -51,4 +51,8 @@ public class ShadedSerializerAdapter<T> extends Serializer<T> {
final ShadedInputAdapter shadedInputAdapter = new ShadedInputAdapter(input);
return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass);
}
+
+ public SerializerShim<T> getSerializerShim() {
+ return this.serializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 9c6a352..c95ede5 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -111,7 +111,6 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
@Override
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
- System.clearProperty(KRYO_SHIM_SERVICE);
this.graphSONInput = RANDOM.nextBoolean();
return new HashMap<String, Object>() {{
put(Graph.GRAPH, HadoopGraph.class.getName());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
index be491c4..693ced6 100644
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
@@ -44,10 +44,8 @@ public final class CompactBufferSerializer<T> extends Serializer<CompactBuffer<T
kryo.writeClassAndObject(output, compactBuffer.evidence$1);
kryo.writeClassAndObject(output, compactBuffer.element0);
kryo.writeClassAndObject(output, compactBuffer.element1);
- output.flush();
output.writeVarInt(compactBuffer.org$apache$spark$util$collection$CompactBuffer$$curSize, true);
kryo.writeClassAndObject(output, compactBuffer.otherElements);
- output.flush();
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
index 6d9b536..ba6d001 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -31,41 +31,46 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.Un
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
/**
* A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}.
*/
-public class IoRegistryAwareKryoSerializer extends KryoSerializer {
-
- private final SparkConf configuration;
+public final class IoRegistryAwareKryoSerializer extends KryoSerializer {
private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class);
+ private final List<TypeRegistration<?>> typeRegistrations = new ArrayList<>();
+
public IoRegistryAwareKryoSerializer(final SparkConf configuration) {
super(configuration);
- // store conf so that we can access its registry (if one is present) in newKryo()
- this.configuration = configuration;
+ if (!configuration.contains(GryoPool.CONFIG_IO_REGISTRY))
+ log.info("SparkConf does not contain a {} property. Skipping {} processing.", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName());
+ else {
+ final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create();
+ for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) {
+ log.info("Registering {} with serializer type: {}", type.getTargetClass().getCanonicalName(), type);
+ this.typeRegistrations.add(type);
+ }
+ }
}
@Override
public Kryo newKryo() {
final Kryo kryo = super.newKryo();
- return applyIoRegistryIfPresent(kryo);
- }
-
- private Kryo applyIoRegistryIfPresent(final Kryo kryo) {
- if (!this.configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) {
- log.info("SparkConf does not contain setting {}, skipping {} handling", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName());
- return kryo;
- }
- final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(this.configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create();
- for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) {
- log.info("Registering {} with serializer {} and id {}", type.getTargetClass().getCanonicalName(), type.getSerializerShim(), type.getId());
- kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter<>(type.getSerializerShim()), type.getId());
+ for (final TypeRegistration<?> type : this.typeRegistrations) {
+ if (null != type.getSerializerShim())
+ kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(type.getSerializerShim()), type.getId());
+ else if (null != type.getShadedSerializer() && type.getShadedSerializer() instanceof ShadedSerializerAdapter)
+ kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) type.getShadedSerializer()).getSerializerShim()), type.getId());
+ else
+ kryo.register(type.getTargetClass(), type.getId());
}
return kryo;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
index 01be50d..2ec9615 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
@@ -37,7 +37,6 @@ public final class ObjectWritableSerializer<T> implements SerializerShim<ObjectW
@Override
public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final ObjectWritable<T> starGraph) {
kryo.writeClassAndObject(output, starGraph.get());
- output.flush();
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
index 05d0b9e..c610286 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
@@ -34,9 +34,7 @@ public final class Tuple2Serializer<A, B> extends Serializer<Tuple2<A, B>> {
@Override
public void write(final Kryo kryo, final Output output, final Tuple2<A, B> tuple2) {
kryo.writeClassAndObject(output, tuple2._1());
- output.flush();
kryo.writeClassAndObject(output, tuple2._2());
- output.flush();
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
index f188794..46a7d02 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
@@ -33,11 +33,8 @@ public final class Tuple3Serializer<A, B, C> extends Serializer<Tuple3<A, B, C>>
@Override
public void write(final Kryo kryo, final Output output, final Tuple3<A, B, C> tuple3) {
kryo.writeClassAndObject(output, tuple3._1());
- output.flush();
kryo.writeClassAndObject(output, tuple3._2());
- output.flush();
kryo.writeClassAndObject(output, tuple3._3());
- output.flush();
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
index c89fb05..93c86d8 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
@@ -38,7 +38,6 @@ public final class VertexWritableSerializer implements SerializerShim<VertexWrit
@Override
public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final VertexWritable vertexWritable) {
kryo.writeObject(output, vertexWritable.get().graph());
- output.flush();
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
index 8de1955..803a19c 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
@@ -36,7 +36,6 @@ public final class WrappedArraySerializer<T> extends Serializer<WrappedArray<T>>
output.writeVarInt(iterable.size(), true);
JavaConversions.asJavaCollection(iterable).forEach(t -> {
kryo.writeClassAndObject(output, t);
- output.flush();
});
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
index a5f8b05..452c47a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
@@ -16,12 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-/**
- * Copyright DataStax, Inc.
- *
- * Please see the included license file for details.
- */
package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
import com.esotericsoftware.kryo.Kryo;
@@ -30,10 +24,9 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
-public class UnshadedSerializerAdapter<T> extends Serializer<T>
-{
+public class UnshadedSerializerAdapter<T> extends Serializer<T> {
- SerializerShim<T> serializer;
+ private final SerializerShim<T> serializer;
public UnshadedSerializerAdapter(final SerializerShim<T> serializer) {
this.serializer = serializer;
@@ -48,10 +41,13 @@ public class UnshadedSerializerAdapter<T> extends Serializer<T>
}
@Override
- public T read(final Kryo kryo, final Input input, final Class<T> aClass)
- {
+ public T read(final Kryo kryo, final Input input, final Class<T> aClass) {
UnshadedKryoAdapter shadedKryoAdapter = new UnshadedKryoAdapter(kryo);
UnshadedInputAdapter shadedInputAdapter = new UnshadedInputAdapter(input);
return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass);
}
+
+ public SerializerShim<T> getSerializerShim() {
+ return this.serializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
index 6d2231f..ab2cf2f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.launcher.SparkLauncher;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
@@ -45,7 +46,7 @@ public abstract class AbstractSparkTest {
public void setupTest() {
SparkConf sparkConfiguration = new SparkConf();
sparkConfiguration.setAppName(this.getClass().getCanonicalName() + "-setupTest");
- sparkConfiguration.set("spark.master", "local[4]");
+ sparkConfiguration.set(SparkLauncher.SPARK_MASTER, "local[4]");
JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
sparkContext.close();
Spark.create(sparkContext.sc());
@@ -56,9 +57,9 @@ public abstract class AbstractSparkTest {
protected Configuration getBaseConfiguration() {
final BaseConfiguration configuration = new BaseConfiguration();
configuration.setDelimiterParsingDisabled(true);
- configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty(SparkLauncher.SPARK_MASTER, "local[4]");
configuration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
- configuration.setProperty("spark.kryo.registrationRequired", true);
+ configuration.setProperty(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, true);
configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
return configuration;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java
new file mode 100644
index 0000000..0260d02
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordWriter;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoIoRegistryTest extends AbstractSparkTest {
+
+ @Test
+ public void shouldSupportIoRegistry() throws Exception {
+ final File input = TestHelper.generateTempFile(this.getClass(), "input", ".kryo");
+ final Configuration configuration = super.getBaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+ configuration.setProperty(GryoPool.CONFIG_IO_REGISTRY, TestIoRegistry.class.getCanonicalName());
+ //configuration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+ configuration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
+ configuration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
+
+ HadoopGraph graph = HadoopGraph.open(configuration);
+
+ final GryoRecordWriter writer = new GryoRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(configuration));
+ for (int i = 0; i < 10; i++) {
+ final StarGraph starGraph = StarGraph.open();
+ starGraph.addVertex(T.label, "place", T.id, i, "point", new ToyPoint(i, i * 10), "message", "I'm " + i);
+ writer.write(NullWritable.get(), new VertexWritable(starGraph.getStarVertex()));
+ }
+ writer.close(new TaskAttemptContextImpl(ConfUtil.makeHadoopConfiguration(configuration), new TaskAttemptID()));
+ // OLAP TESTING //
+ final List<ToyPoint> points = graph.traversal().withComputer(SparkGraphComputer.class).V().<ToyPoint>values("point").toList();
+ assertEquals(10, points.size());
+ for (int i = 0; i < 10; i++) {
+ assertTrue(points.contains(new ToyPoint(i, i * 10)));
+ }
+ points.clear();
+ // OLTP TESTING //
+ graph.traversal().V().<ToyPoint>values("point").fill(points);
+ assertEquals(10, points.size());
+ for (int i = 0; i < 10; i++) {
+ assertTrue(points.contains(new ToyPoint(i, i * 10)));
+ }
+ points.clear();
+ // HDFS TESTING //
+ final List<Vertex> list = IteratorUtils.asList(FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(configuration)).head(input.getAbsolutePath(), GryoInputFormat.class));
+ list.forEach(v -> points.add(v.value("point")));
+ assertEquals(10, points.size());
+ for (int i = 0; i < 10; i++) {
+ assertTrue(points.contains(new ToyPoint(i, i * 10)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java
new file mode 100644
index 0000000..9a78aab
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TestIoRegistry extends AbstractIoRegistry {
+
+ private static final TestIoRegistry INSTANCE = new TestIoRegistry();
+
+ private TestIoRegistry() {
+ super.register(GryoIo.class, ToyPoint.class, new ShadedSerializerAdapter<>(new ToyPoint.ToyPointSerializer()));
+ }
+
+ public static TestIoRegistry getInstance() {
+ return INSTANCE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/77fb25ff/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java
new file mode 100644
index 0000000..e46e9c3
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ToyPoint implements Serializable {
+
+ private final int x;
+ private final int y;
+
+ public ToyPoint(final int x, final int y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public int getX() {
+ return this.x;
+ }
+
+ public int getY() {
+ return this.y;
+ }
+
+ public int hashCode() {
+ return this.x + this.y;
+ }
+
+ public boolean equals(final Object other) {
+ return other instanceof ToyPoint && ((ToyPoint) other).x == this.x && ((ToyPoint) other).y == this.y;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + this.x + "," + this.y + "]";
+ }
+
+ public static class ToyPointSerializer implements SerializerShim<ToyPoint> {
+ @Override
+ public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final ToyPoint toyPoint) {
+ output.writeInt(toyPoint.x);
+ output.writeInt(toyPoint.y);
+ }
+
+ @Override
+ public <I extends InputShim> ToyPoint read(final KryoShim<I, ?> kryo, final I input, final Class<ToyPoint> toyPointClass) {
+ return new ToyPoint(input.readInt(), input.readInt());
+ }
+ }
+}