You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/06 20:53:49 UTC
[10/10] incubator-tinkerpop git commit: Able to now test both shim
and non-shim models in Spark. Also go configuration with ProgramTest working.
Able to now test both shim and non-shim models in Spark. Also go configuration with ProgramTest working.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e7003635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e7003635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e7003635
Branch: refs/heads/TINKERPOP-1321
Commit: e7003635e27c625b3f30492111f20f4fe4e24eb5
Parents: 0cd31bf
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jun 6 14:52:53 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jun 6 14:52:53 2016 -0600
----------------------------------------------------------------------
.../structure/io/gryo/GryoSerializers.java | 8 +-
.../structure/io/gryo/GryoRegistrator.java | 90 ++++++++++++++++----
.../computer/SparkHadoopGraphProvider.java | 11 ++-
3 files changed, 88 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
index 16fbe85..2042a4a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
@@ -47,7 +47,7 @@ public final class GryoSerializers {
/**
* Serializes any {@link Edge} implementation encountered to a {@link DetachedEdge}.
*/
- final static class EdgeSerializer implements SerializerShim<Edge> {
+ public final static class EdgeSerializer implements SerializerShim<Edge> {
@Override
public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Edge edge) {
kryo.writeClassAndObject(output, DetachedFactory.detach(edge, true));
@@ -63,7 +63,7 @@ public final class GryoSerializers {
/**
* Serializes any {@link Vertex} implementation encountered to an {@link DetachedVertex}.
*/
- final static class VertexSerializer implements SerializerShim<Vertex> {
+ public final static class VertexSerializer implements SerializerShim<Vertex> {
@Override
public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Vertex vertex) {
kryo.writeClassAndObject(output, DetachedFactory.detach(vertex, true));
@@ -78,7 +78,7 @@ public final class GryoSerializers {
/**
* Serializes any {@link Property} implementation encountered to an {@link DetachedProperty}.
*/
- final static class PropertySerializer implements SerializerShim<Property> {
+ public final static class PropertySerializer implements SerializerShim<Property> {
@Override
public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Property property) {
kryo.writeClassAndObject(output, property instanceof VertexProperty ? DetachedFactory.detach((VertexProperty) property, true) : DetachedFactory.detach(property));
@@ -93,7 +93,7 @@ public final class GryoSerializers {
/**
* Serializes any {@link VertexProperty} implementation encountered to an {@link DetachedVertexProperty}.
*/
- final static class VertexPropertySerializer implements SerializerShim<VertexProperty> {
+ public final static class VertexPropertySerializer implements SerializerShim<VertexProperty> {
@Override
public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, VertexProperty vertexProperty) {
kryo.writeClassAndObject(output, DetachedFactory.detach(vertexProperty, true));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
index 1ae8c5c..9563408 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
@@ -22,19 +22,44 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.serializer.KryoRegistrator;
+import org.apache.spark.util.SerializableConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.ImmutablePath;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.MutablePath;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_P_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_P_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_OB_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_Traverser;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoSerializers;
import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.collection.mutable.WrappedArray;
import java.util.*;
@@ -55,11 +80,11 @@ public class GryoRegistrator implements KryoRegistrator {
* Register TinkerPop's classes with the supplied {@link Kryo} instance
* while honoring optional overrides and optional class blacklist ("blackset"?).
*
- * @param kryo the Kryo serializer instance with which to register types
+ * @param kryo the Kryo serializer instance with which to register types
* @param serializerOverrides serializer mappings that override this class's defaults
- * @param blacklist classes which should not be registered at all, even if there is an override entry
- * or if they would be registered by this class by default (does not affect Kryo's
- * built-in registrations, e.g. String.class).
+ * @param blacklist classes which should not be registered at all, even if there is an override entry
+ * or if they would be registered by this class by default (does not affect Kryo's
+ * built-in registrations, e.g. String.class).
*/
public void registerClasses(Kryo kryo, Map<Class<?>, Serializer<?>> serializerOverrides, Set<Class<?>> blacklist) {
// Apply TinkerPop type registrations copied from GyroSerializer's constructor
@@ -111,11 +136,11 @@ public class GryoRegistrator implements KryoRegistrator {
} else {
// There's supposed to be a check in GryoMapper that prevents this from happening
log.error("GryoMapper's default serialization registration for {} is a {}. " +
- "This is probably a bug in TinkerPop (this is not a valid default registration). " +
- "I am configuring Spark to use Kryo's default serializer for this class, " +
- "but this may cause serialization failures at runtime.",
- tr.getTargetClass(),
- org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+ "This is probably a bug in TinkerPop (this is not a valid default registration). " +
+ "I am configuring Spark to use Kryo's default serializer for this class, " +
+ "but this may cause serialization failures at runtime.",
+ tr.getTargetClass(),
+ org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
kryo.register(tr.getTargetClass());
}
} else if (null != serializerShim) {
@@ -127,12 +152,12 @@ public class GryoRegistrator implements KryoRegistrator {
} else if (null != functionOfShadedKryo) {
// As with shaded serializers, there's supposed to be a check in GryoMapper that prevents this from happening
log.error("GryoMapper's default serialization registration for {} is a Function<{},{}>. " +
- "This is probably a bug in TinkerPop (this is not a valid default registration). " +
- "I am configuring Spark to use Kryo's default serializer instead of this function, " +
- "but this may cause serialization failures at runtime.",
- tr.getTargetClass(),
- org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
- org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+ "This is probably a bug in TinkerPop (this is not a valid default registration). " +
+ "I am configuring Spark to use Kryo's default serializer instead of this function, " +
+ "but this may cause serialization failures at runtime.",
+ tr.getTargetClass(),
+ org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
+ org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
kryo.register(tr.getTargetClass());
} else {
// Register all other classes with the default behavior (FieldSerializer)
@@ -164,13 +189,46 @@ public class GryoRegistrator implements KryoRegistrator {
// duplication, but it would be a bit cumbersome to do so without disturbing
// the ordering of the existing entries in that constructor, since not all
// of the entries are for TinkerPop (and the ordering is significant).
+ if (Boolean.valueOf(System.getProperty("is.testing", "false"))) {
+ try {
+ m.put(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer());
+ m.put(Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer());
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ m.put(WrappedArray.ofRef.class, null);
m.put(MessagePayload.class, null);
m.put(ViewIncomingPayload.class, null);
m.put(ViewOutgoingPayload.class, null);
m.put(ViewPayload.class, null);
m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer()));
m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>()));
-
+ //
+ m.put(HadoopConfiguration.class, null);
+ //
+ m.put(HadoopVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer()));
+ m.put(HadoopVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer()));
+ m.put(HadoopProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer()));
+ m.put(HadoopEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializers.EdgeSerializer()));
+ //
+ m.put(ComputerGraph.ComputerVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer()));
+ m.put(ComputerGraph.ComputerVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer()));
+ m.put(ComputerGraph.ComputerProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer()));
+ m.put(ComputerGraph.ComputerEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializers.EdgeSerializer()));
+ //
+ m.put(StarGraph.StarEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializers.EdgeSerializer()));
+ m.put(StarGraph.StarVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer()));
+ m.put(StarGraph.StarProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer()));
+ m.put(StarGraph.StarVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer()));
+ //
+ m.put(MutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PathSerializer()));
+ m.put(ImmutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PathSerializer()));
+ try {
+ m.put(Class.forName(ImmutablePath.class.getCanonicalName() + "$TailPath"), new UnshadedSerializerAdapter<>(new GryoSerializers.PathSerializer()));
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
return m;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 85552ce..7737d1e 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopGremlinPluginCheck;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
import org.apache.tinkerpop.gremlin.process.computer.Computer;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -40,10 +41,14 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import java.util.Map;
+import static org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader.SHIM_CLASS_SYSTEM_PROPERTY;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -83,9 +88,13 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
config.put("spark.master", "local[4]");
- if (false) {
+ if (RANDOM.nextBoolean()) {
+ System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, HadoopPoolShimService.class.getCanonicalName());
+ KryoShimServiceLoader.load(true);
config.put("spark.serializer", GryoSerializer.class.getCanonicalName());
} else {
+ System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, UnshadedKryoShimService.class.getCanonicalName());
+ KryoShimServiceLoader.load(true);
config.put("spark.serializer", KryoSerializer.class.getCanonicalName());
config.put("spark.kryo.registrator", GryoRegistrator.class.getCanonicalName());
}