You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/03 19:03:21 UTC
incubator-tinkerpop git commit: Created specialized serializers for
common classes in Spark to avoid the overhead of JavaSerialization.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-962 f7ad5c4f6 -> b824d0c09
Created specialized serializers for common classes in Spark to avoid the overhead of JavaSerialization.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b824d0c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b824d0c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b824d0c0
Branch: refs/heads/TINKERPOP-962
Commit: b824d0c0994276e3714dc59341aa24526127eafe
Parents: f7ad5c4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Feb 3 11:03:23 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Feb 3 11:03:23 2016 -0700
----------------------------------------------------------------------
.../io/gryo/CompactBufferSerializer.groovy | 63 ++++++++++++++++++++
.../spark/structure/io/gryo/GryoSerializer.java | 33 +++++-----
.../io/gryo/ObjectWritableSerializer.java | 43 +++++++++++++
.../structure/io/gryo/Tuple2Serializer.java | 46 ++++++++++++++
.../structure/io/gryo/Tuple3Serializer.java | 47 +++++++++++++++
.../io/gryo/VertexWritableSerializer.java | 42 +++++++++++++
.../io/gryo/WrappedArraySerializer.java | 16 +++--
7 files changed, 269 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
new file mode 100644
index 0000000..be491c4
--- /dev/null
+++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo
+
+import org.apache.spark.util.collection.CompactBuffer
+import org.apache.tinkerpop.shaded.kryo.Kryo
+import org.apache.tinkerpop.shaded.kryo.Serializer
+import org.apache.tinkerpop.shaded.kryo.io.Input
+import org.apache.tinkerpop.shaded.kryo.io.Output
+import scala.reflect.ClassTag
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class CompactBufferSerializer<T> extends Serializer<CompactBuffer<T>> {
+
+ /*
+ private final ClassTag<T> evidence$1;
+ private T element0;
+ private T element1;
+ private int org$apache$spark$util$collection$CompactBuffer$$curSize;
+ private Object otherElements;
+ */
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final CompactBuffer<T> compactBuffer) {
+ kryo.writeClassAndObject(output, compactBuffer.evidence$1);
+ kryo.writeClassAndObject(output, compactBuffer.element0);
+ kryo.writeClassAndObject(output, compactBuffer.element1);
+ output.flush();
+ output.writeVarInt(compactBuffer.org$apache$spark$util$collection$CompactBuffer$$curSize, true);
+ kryo.writeClassAndObject(output, compactBuffer.otherElements);
+ output.flush();
+ }
+
+ @Override
+ public CompactBuffer<T> read(Kryo kryo, Input input, Class<CompactBuffer<T>> aClass) {
+ final ClassTag<T> classTag = kryo.readClassAndObject(input);
+ final CompactBuffer<T> compactBuffer = new CompactBuffer<>(classTag);
+ compactBuffer.element0 = kryo.readClassAndObject(input);
+ compactBuffer.element1 = kryo.readClassAndObject(input);
+ compactBuffer.org$apache$spark$util$collection$CompactBuffer$$curSize = input.readVarInt(true);
+ compactBuffer.otherElements = kryo.readClassAndObject(input);
+ return compactBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index a66b146..9c546fe 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -22,7 +22,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.spark.SerializableWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.python.PythonBroadcast;
import org.apache.spark.broadcast.HttpBroadcast;
@@ -31,6 +30,7 @@ import org.apache.spark.scheduler.CompressedMapStatus;
import org.apache.spark.scheduler.HighlyCompressedMapStatus;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.storage.BlockManagerId;
import org.apache.spark.util.SerializableConfiguration;
import org.apache.spark.util.collection.CompactBuffer;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
@@ -81,27 +81,28 @@ public final class GryoSerializer extends Serializer {
ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
initializeMapper(builder -> {
try {
- builder.addCustom(SerializableWritable.class, new JavaSerializer())
- .addCustom(Tuple2.class, new JavaSerializer())
- .addCustom(Tuple2[].class, new JavaSerializer())
- .addCustom(Tuple3.class, new JavaSerializer())
- .addCustom(Tuple3[].class, new JavaSerializer())
- .addCustom(CompactBuffer.class, new JavaSerializer())
- .addCustom(CompactBuffer[].class, new JavaSerializer())
- .addCustom(CompressedMapStatus.class, new JavaSerializer())
- .addCustom(HighlyCompressedMapStatus.class, new JavaSerializer())
- .addCustom(HttpBroadcast.class, new JavaSerializer())
- .addCustom(PythonBroadcast.class, new JavaSerializer())
- .addCustom(BoxedUnit.class, new JavaSerializer())
+ builder.addCustom(Tuple2.class, new Tuple2Serializer())
+ .addCustom(Tuple2[].class)
+ .addCustom(Tuple3.class, new Tuple3Serializer())
+ .addCustom(Tuple3[].class)
+ .addCustom(CompactBuffer.class, new CompactBufferSerializer())
+ .addCustom(CompactBuffer[].class)
+ .addCustom(CompressedMapStatus.class)
+ .addCustom(HighlyCompressedMapStatus.class)
+ .addCustom(HttpBroadcast.class)
+ .addCustom(PythonBroadcast.class)
+ .addCustom(BoxedUnit.class)
+ .addCustom(BlockManagerId.class)
.addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer())
+ .addCustom(Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer())
.addCustom(WrappedArray.ofRef.class, new WrappedArraySerializer())
.addCustom(MessagePayload.class)
.addCustom(ViewIncomingPayload.class)
.addCustom(ViewOutgoingPayload.class)
.addCustom(ViewPayload.class)
- .addCustom(SerializableConfiguration.class, new JavaSerializer())
- .addCustom(VertexWritable.class, new JavaSerializer())
- .addCustom(ObjectWritable.class, new JavaSerializer())
+ .addCustom(SerializableConfiguration.class)
+ .addCustom(VertexWritable.class, new VertexWritableSerializer())
+ .addCustom(ObjectWritable.class, new ObjectWritableSerializer())
.referenceTracking(referenceTracking)
.registrationRequired(registrationRequired);
// add these as we find ClassNotFoundExceptions
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
new file mode 100644
index 0000000..21cbc60
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ObjectWritableSerializer<T> extends Serializer<ObjectWritable<T>> {
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final ObjectWritable<T> objectWritable) {
+ kryo.writeClassAndObject(output, objectWritable.get());
+ output.flush();
+ }
+
+ @Override
+ public ObjectWritable<T> read(final Kryo kryo, final Input input, final Class<ObjectWritable<T>> clazz) {
+ return new ObjectWritable(kryo.readClassAndObject(input));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
new file mode 100644
index 0000000..05d0b9e
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class Tuple2Serializer<A, B> extends Serializer<Tuple2<A, B>> {
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final Tuple2<A, B> tuple2) {
+ kryo.writeClassAndObject(output, tuple2._1());
+ output.flush();
+ kryo.writeClassAndObject(output, tuple2._2());
+ output.flush();
+ }
+
+ @Override
+ public Tuple2<A, B> read(final Kryo kryo, final Input input, final Class<Tuple2<A, B>> clazz) {
+ return new Tuple2(kryo.readClassAndObject(input), kryo.readClassAndObject(input));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
new file mode 100644
index 0000000..f188794
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.Tuple3;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class Tuple3Serializer<A, B, C> extends Serializer<Tuple3<A, B, C>> {
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final Tuple3<A, B, C> tuple3) {
+ kryo.writeClassAndObject(output, tuple3._1());
+ output.flush();
+ kryo.writeClassAndObject(output, tuple3._2());
+ output.flush();
+ kryo.writeClassAndObject(output, tuple3._3());
+ output.flush();
+ }
+
+ @Override
+ public Tuple3<A, B, C> read(final Kryo kryo, final Input input, final Class<Tuple3<A, B, C>> clazz) {
+ return new Tuple3(kryo.readClassAndObject(input), kryo.readClassAndObject(input), kryo.readClassAndObject(input));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
new file mode 100644
index 0000000..97891f3
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VertexWritableSerializer extends Serializer<VertexWritable> {
+ @Override
+ public void write(final Kryo kryo, final Output output, final VertexWritable vertexWritable) {
+ kryo.writeObject(output, vertexWritable.get().graph());
+ }
+
+ @Override
+ public VertexWritable read(final Kryo kryo, final Input input, final Class<VertexWritable> aClass) {
+ return new VertexWritable(kryo.readObject(input, StarGraph.class).getStarVertex());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b824d0c0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
index 0e9f03f..7cfbf11 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
@@ -26,9 +26,6 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
import scala.collection.JavaConversions;
import scala.collection.mutable.WrappedArray;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -36,11 +33,20 @@ public final class WrappedArraySerializer<T> extends Serializer<WrappedArray<T>>
@Override
public void write(final Kryo kryo, final Output output, final WrappedArray<T> iterable) {
- kryo.writeClassAndObject(output,new ArrayList<>(JavaConversions.asJavaList(iterable)));
+ output.writeVarInt(iterable.size(), true);
+ JavaConversions.asJavaList(iterable).forEach(t -> {
+ kryo.writeClassAndObject(output, t);
+ output.flush();
+ });
}
@Override
public WrappedArray<T> read(final Kryo kryo, final Input input, final Class<WrappedArray<T>> aClass) {
- return new WrappedArray.ofRef<>((T[]) ((List<T>) kryo.readClassAndObject(input)).toArray());
+ final int size = input.readVarInt(true);
+ final Object[] array = new Object[size];
+ for (int i = 0; i < size; i++) {
+ array[i] = kryo.readClassAndObject(input);
+ }
+ return new WrappedArray.ofRef<>((T[]) array);
}
}