You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/03 19:03:21 UTC

incubator-tinkerpop git commit: Created specialized serializers for common classes in Spark to avoid the overhead of JavaSerialization.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-962 f7ad5c4f6 -> b824d0c09


Created specialized serializers for common classes in Spark to avoid the overhead of JavaSerialization.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b824d0c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b824d0c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b824d0c0

Branch: refs/heads/TINKERPOP-962
Commit: b824d0c0994276e3714dc59341aa24526127eafe
Parents: f7ad5c4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Feb 3 11:03:23 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Feb 3 11:03:23 2016 -0700

----------------------------------------------------------------------
 .../io/gryo/CompactBufferSerializer.groovy      | 63 ++++++++++++++++++++
 .../spark/structure/io/gryo/GryoSerializer.java | 33 +++++-----
 .../io/gryo/ObjectWritableSerializer.java       | 43 +++++++++++++
 .../structure/io/gryo/Tuple2Serializer.java     | 46 ++++++++++++++
 .../structure/io/gryo/Tuple3Serializer.java     | 47 +++++++++++++++
 .../io/gryo/VertexWritableSerializer.java       | 42 +++++++++++++
 .../io/gryo/WrappedArraySerializer.java         | 16 +++--
 7 files changed, 269 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
new file mode 100644
index 0000000..be491c4
--- /dev/null
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo
+
+import org.apache.spark.util.collection.CompactBuffer
+import org.apache.tinkerpop.shaded.kryo.Kryo
+import org.apache.tinkerpop.shaded.kryo.Serializer
+import org.apache.tinkerpop.shaded.kryo.io.Input
+import org.apache.tinkerpop.shaded.kryo.io.Output
+import scala.reflect.ClassTag
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class CompactBufferSerializer<T> extends Serializer<CompactBuffer<T>> {
+
+    /*
+    private final ClassTag<T> evidence$1;
+    private T element0;
+    private T element1;
+    private int org$apache$spark$util$collection$CompactBuffer$$curSize;
+    private Object otherElements;
+     */
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final CompactBuffer<T> compactBuffer) {
+        kryo.writeClassAndObject(output, compactBuffer.evidence$1);
+        kryo.writeClassAndObject(output, compactBuffer.element0);
+        kryo.writeClassAndObject(output, compactBuffer.element1);
+        output.flush();
+        output.writeVarInt(compactBuffer.org$apache$spark$util$collection$CompactBuffer$$curSize, true);
+        kryo.writeClassAndObject(output, compactBuffer.otherElements);
+        output.flush();
+    }
+
+    @Override
+    public CompactBuffer<T> read(Kryo kryo, Input input, Class<CompactBuffer<T>> aClass) {
+        final ClassTag<T> classTag = kryo.readClassAndObject(input);
+        final CompactBuffer<T> compactBuffer = new CompactBuffer<>(classTag);
+        compactBuffer.element0 = kryo.readClassAndObject(input);
+        compactBuffer.element1 = kryo.readClassAndObject(input);
+        compactBuffer.org$apache$spark$util$collection$CompactBuffer$$curSize = input.readVarInt(true);
+        compactBuffer.otherElements = kryo.readClassAndObject(input);
+        return compactBuffer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index a66b146..9c546fe 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -22,7 +22,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
 
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
-import org.apache.spark.SerializableWritable;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.python.PythonBroadcast;
 import org.apache.spark.broadcast.HttpBroadcast;
@@ -31,6 +30,7 @@ import org.apache.spark.scheduler.CompressedMapStatus;
 import org.apache.spark.scheduler.HighlyCompressedMapStatus;
 import org.apache.spark.serializer.Serializer;
 import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.storage.BlockManagerId;
 import org.apache.spark.util.SerializableConfiguration;
 import org.apache.spark.util.collection.CompactBuffer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
@@ -81,27 +81,28 @@ public final class GryoSerializer extends Serializer {
                 ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {
-                        builder.addCustom(SerializableWritable.class, new JavaSerializer())
-                                .addCustom(Tuple2.class, new JavaSerializer())
-                                .addCustom(Tuple2[].class, new JavaSerializer())
-                                .addCustom(Tuple3.class, new JavaSerializer())
-                                .addCustom(Tuple3[].class, new JavaSerializer())
-                                .addCustom(CompactBuffer.class, new JavaSerializer())
-                                .addCustom(CompactBuffer[].class, new JavaSerializer())
-                                .addCustom(CompressedMapStatus.class, new JavaSerializer())
-                                .addCustom(HighlyCompressedMapStatus.class, new JavaSerializer())
-                                .addCustom(HttpBroadcast.class, new JavaSerializer())
-                                .addCustom(PythonBroadcast.class, new JavaSerializer())
-                                .addCustom(BoxedUnit.class, new JavaSerializer())
+                        builder.addCustom(Tuple2.class, new Tuple2Serializer())
+                                .addCustom(Tuple2[].class)
+                                .addCustom(Tuple3.class, new Tuple3Serializer())
+                                .addCustom(Tuple3[].class)
+                                .addCustom(CompactBuffer.class, new CompactBufferSerializer())
+                                .addCustom(CompactBuffer[].class)
+                                .addCustom(CompressedMapStatus.class)
+                                .addCustom(HighlyCompressedMapStatus.class)
+                                .addCustom(HttpBroadcast.class)
+                                .addCustom(PythonBroadcast.class)
+                                .addCustom(BoxedUnit.class)
+                                .addCustom(BlockManagerId.class)
                                 .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer())
+                                .addCustom(Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer())
                                 .addCustom(WrappedArray.ofRef.class, new WrappedArraySerializer())
                                 .addCustom(MessagePayload.class)
                                 .addCustom(ViewIncomingPayload.class)
                                 .addCustom(ViewOutgoingPayload.class)
                                 .addCustom(ViewPayload.class)
-                                .addCustom(SerializableConfiguration.class, new JavaSerializer())
-                                .addCustom(VertexWritable.class, new JavaSerializer())
-                                .addCustom(ObjectWritable.class, new JavaSerializer())
+                                .addCustom(SerializableConfiguration.class)
+                                .addCustom(VertexWritable.class, new VertexWritableSerializer())
+                                .addCustom(ObjectWritable.class, new ObjectWritableSerializer())
                                 .referenceTracking(referenceTracking)
                                 .registrationRequired(registrationRequired);
                         // add these as we find ClassNotFoundExceptions

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
new file mode 100644
index 0000000..21cbc60
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ObjectWritableSerializer<T> extends Serializer<ObjectWritable<T>> {
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final ObjectWritable<T> objectWritable) {
+        kryo.writeClassAndObject(output, objectWritable.get());
+        output.flush();
+    }
+
+    @Override
+    public ObjectWritable<T> read(final Kryo kryo, final Input input, final Class<ObjectWritable<T>> clazz) {
+        return new ObjectWritable(kryo.readClassAndObject(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
new file mode 100644
index 0000000..05d0b9e
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class Tuple2Serializer<A, B> extends Serializer<Tuple2<A, B>> {
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final Tuple2<A, B> tuple2) {
+        kryo.writeClassAndObject(output, tuple2._1());
+        output.flush();
+        kryo.writeClassAndObject(output, tuple2._2());
+        output.flush();
+    }
+
+    @Override
+    public Tuple2<A, B> read(final Kryo kryo, final Input input, final Class<Tuple2<A, B>> clazz) {
+        return new Tuple2(kryo.readClassAndObject(input), kryo.readClassAndObject(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
new file mode 100644
index 0000000..f188794
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.Tuple3;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class Tuple3Serializer<A, B, C> extends Serializer<Tuple3<A, B, C>> {
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final Tuple3<A, B, C> tuple3) {
+        kryo.writeClassAndObject(output, tuple3._1());
+        output.flush();
+        kryo.writeClassAndObject(output, tuple3._2());
+        output.flush();
+        kryo.writeClassAndObject(output, tuple3._3());
+        output.flush();
+    }
+
+    @Override
+    public Tuple3<A, B, C> read(final Kryo kryo, final Input input, final Class<Tuple3<A, B, C>> clazz) {
+        return new Tuple3(kryo.readClassAndObject(input), kryo.readClassAndObject(input), kryo.readClassAndObject(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
new file mode 100644
index 0000000..97891f3
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VertexWritableSerializer extends Serializer<VertexWritable> {
+    @Override
+    public void write(final Kryo kryo, final Output output, final VertexWritable vertexWritable) {
+        kryo.writeObject(output, vertexWritable.get().graph());
+    }
+
+    @Override
+    public VertexWritable read(final Kryo kryo, final Input input, final Class<VertexWritable> aClass) {
+        return new VertexWritable(kryo.readObject(input, StarGraph.class).getStarVertex());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
index 0e9f03f..7cfbf11 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
@@ -26,9 +26,6 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
 import scala.collection.JavaConversions;
 import scala.collection.mutable.WrappedArray;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -36,11 +33,20 @@ public final class WrappedArraySerializer<T> extends Serializer<WrappedArray<T>>
 
     @Override
     public void write(final Kryo kryo, final Output output, final WrappedArray<T> iterable) {
-        kryo.writeClassAndObject(output,new ArrayList<>(JavaConversions.asJavaList(iterable)));
+        output.writeVarInt(iterable.size(), true);
+        JavaConversions.asJavaList(iterable).forEach(t -> {
+            kryo.writeClassAndObject(output, t);
+            output.flush();
+        });
     }
 
     @Override
     public WrappedArray<T> read(final Kryo kryo, final Input input, final Class<WrappedArray<T>> aClass) {
-        return new WrappedArray.ofRef<>((T[]) ((List<T>) kryo.readClassAndObject(input)).toArray());
+        final int size = input.readVarInt(true);
+        final Object[] array = new Object[size];
+        for (int i = 0; i < size; i++) {
+            array[i] = kryo.readClassAndObject(input);
+        }
+        return new WrappedArray.ofRef<>((T[]) array);
     }
 }