You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/08 13:33:09 UTC

[09/50] [abbrv] incubator-tinkerpop git commit: Introduce Kryo shim to support serializer reuse

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
new file mode 100644
index 0000000..d5ba90d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.util.star;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
+
+public class StarGraphSerializer implements SerializerShim<StarGraph> {
+
+    private final Direction edgeDirectionToSerialize;
+    private GraphFilter graphFilter;
+
+    private final static byte VERSION_1 = Byte.MIN_VALUE;
+
+    public StarGraphSerializer(final Direction edgeDirectionToSerialize, final GraphFilter graphFilter) {
+        this.edgeDirectionToSerialize = edgeDirectionToSerialize;
+        this.graphFilter = graphFilter;
+    }
+
+    @Override
+    public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final StarGraph starGraph) {
+        output.writeByte(VERSION_1);
+        kryo.writeObjectOrNull(output, starGraph.edgeProperties, HashMap.class);
+        kryo.writeObjectOrNull(output, starGraph.metaProperties, HashMap.class);
+        kryo.writeClassAndObject(output, starGraph.starVertex.id);
+        kryo.writeObject(output, starGraph.starVertex.label);
+        writeEdges(kryo, output, starGraph, Direction.IN);
+        writeEdges(kryo, output, starGraph, Direction.OUT);
+        kryo.writeObject(output, null != starGraph.starVertex.vertexProperties);
+        if (null != starGraph.starVertex.vertexProperties) {
+            kryo.writeObject(output, starGraph.starVertex.vertexProperties.size());
+            for (final Map.Entry<String, List<VertexProperty>> vertexProperties : starGraph.starVertex.vertexProperties.entrySet()) {
+                kryo.writeObject(output, vertexProperties.getKey());
+                kryo.writeObject(output, vertexProperties.getValue().size());
+                for (final VertexProperty vertexProperty : vertexProperties.getValue()) {
+                    kryo.writeClassAndObject(output, vertexProperty.id());
+                    kryo.writeClassAndObject(output, vertexProperty.value());
+                }
+            }
+        }
+    }
+
+    /**
+     * If the returned {@link StarGraph} is null, that means that the {@link GraphFilter} filtered the vertex.
+     */
+    @Override
+    public <I extends InputShim> StarGraph read(KryoShim<I, ?> kryo, I input, Class<StarGraph> clazz) {
+        final StarGraph starGraph = StarGraph.open();
+        input.readByte();  // version field ignored for now - for future use with backward compatibility
+        starGraph.edgeProperties = kryo.readObjectOrNull(input, HashMap.class);
+        starGraph.metaProperties = kryo.readObjectOrNull(input, HashMap.class);
+        starGraph.addVertex(T.id, kryo.readClassAndObject(input), T.label, kryo.readObject(input, String.class));
+        readEdges(kryo, input, starGraph, Direction.IN);
+        readEdges(kryo, input, starGraph, Direction.OUT);
+        if (kryo.readObject(input, Boolean.class)) {
+            final int numberOfUniqueKeys = kryo.readObject(input, Integer.class);
+            for (int i = 0; i < numberOfUniqueKeys; i++) {
+                final String vertexPropertyKey = kryo.readObject(input, String.class);
+                final int numberOfVertexPropertiesWithKey = kryo.readObject(input, Integer.class);
+                for (int j = 0; j < numberOfVertexPropertiesWithKey; j++) {
+                    final Object id = kryo.readClassAndObject(input);
+                    final Object value = kryo.readClassAndObject(input);
+                    starGraph.starVertex.property(VertexProperty.Cardinality.list, vertexPropertyKey, value, T.id, id);
+                }
+            }
+        }
+        return this.graphFilter.hasFilter() ? starGraph.applyGraphFilter(this.graphFilter).orElse(null) : starGraph;
+    }
+
+    private <O extends OutputShim> void writeEdges(final KryoShim<?, O> kryo, final O output, final StarGraph starGraph, final Direction direction) {
+        // only write edges if there are some AND if the user requested them to be serialized AND if they match
+        // the direction being serialized by the format
+        final Map<String, List<Edge>> starEdges = direction.equals(Direction.OUT) ? starGraph.starVertex.outEdges : starGraph.starVertex.inEdges;
+        final boolean writeEdges = null != starEdges && edgeDirectionToSerialize != null
+                && (edgeDirectionToSerialize == direction || edgeDirectionToSerialize == Direction.BOTH);
+        kryo.writeObject(output, writeEdges);
+        if (writeEdges) {
+            kryo.writeObject(output, starEdges.size());
+            for (final Map.Entry<String, List<Edge>> edges : starEdges.entrySet()) {
+                kryo.writeObject(output, edges.getKey());
+                kryo.writeObject(output, edges.getValue().size());
+                for (final Edge edge : edges.getValue()) {
+                    kryo.writeClassAndObject(output, edge.id());
+                    kryo.writeClassAndObject(output, direction.equals(Direction.OUT) ? edge.inVertex().id() : edge.outVertex().id());
+                }
+            }
+        }
+    }
+
+    private <I extends InputShim> void readEdges(final KryoShim<I, ?> kryo, final I input, final StarGraph starGraph, final Direction direction) {
+        if (kryo.readObject(input, Boolean.class)) {
+            final int numberOfUniqueLabels = kryo.readObject(input, Integer.class);
+            for (int i = 0; i < numberOfUniqueLabels; i++) {
+                final String edgeLabel = kryo.readObject(input, String.class);
+                final int numberOfEdgesWithLabel = kryo.readObject(input, Integer.class);
+                for (int j = 0; j < numberOfEdgesWithLabel; j++) {
+                    final Object edgeId = kryo.readClassAndObject(input);
+                    final Object adjacentVertexId = kryo.readClassAndObject(input);
+                    if (this.graphFilter.checkEdgeLegality(direction, edgeLabel).positive()) {
+                        if (direction.equals(Direction.OUT))
+                            starGraph.starVertex.addOutEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
+                        else
+                            starGraph.starVertex.addInEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
+                    } else if (null != starGraph.edgeProperties) {
+                        starGraph.edgeProperties.remove(edgeId);
+                    }
+                }
+            }
+        }
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
new file mode 100644
index 0000000..2053280
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class HadoopPoolShimService implements KryoShimService {
+
+    public Object readClassAndObject(InputStream source) {
+
+        Kryo k = null;
+
+        try {
+            k = HadoopPools.getGryoPool().takeKryo();
+
+            return k.readClassAndObject(new Input(source));
+        } finally {
+            if (null != k) {
+                HadoopPools.getGryoPool().offerKryo(k);
+            }
+        }
+    }
+
+    public void writeClassAndObject(Object o, OutputStream sink) {
+
+        Kryo k = null;
+
+        try {
+            k = HadoopPools.getGryoPool().takeKryo();
+
+            Output output = new Output(sink);
+
+            k.writeClassAndObject(output, o);
+
+            output.flush();
+        } finally {
+            if (null != k) {
+                HadoopPools.getGryoPool().offerKryo(k);
+            }
+        }
+    }
+
+    @Override
+    public int getPriority() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index f09f703..5074ad5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -43,6 +43,7 @@ public final class HadoopPools {
             GRYO_POOL = GryoPool.build().
                     poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
                     ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+                    initializeMapper(m -> m.registrationRequired(false)).
                     create();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
index 9a07f75..e7a38a5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
@@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -63,29 +65,14 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
 
     @Override
     public void readFields(final DataInput input) throws IOException {
-        this.t = HadoopPools.getGryoPool().doWithReader(gryoReader -> {
-            try {
-                // class argument is Object because gryo doesn't really care that we don't know the specific type.
-                // the type is embedded in the stream so it can just read it from there and return it as needed.
-                // presumably that will cast nicely to T
-                return (T) gryoReader.readObject(new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)), Object.class);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        });
+        ByteArrayInputStream bais = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
+        this.t = KryoShimServiceLoader.readClassAndObject(bais);
     }
 
     @Override
     public void write(final DataOutput output) throws IOException {
-        HadoopPools.getGryoPool().doWithWriter(gryoWriter -> {
-            try {
-                final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-                gryoWriter.writeObject(outputStream, this.t);
-                WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        });
+        byte serialized[] = KryoShimServiceLoader.writeClassAndObjectToBytes(this.t);
+        WritableUtils.writeCompressedByteArray(output, serialized);
     }
 
     private void writeObject(final ObjectOutputStream outputStream) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index ac360e9..7ac8e8c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 
@@ -60,42 +61,15 @@ public final class VertexWritable implements Writable, Serializable {
 
     @Override
     public void readFields(final DataInput input) throws IOException {
-        try {
-            this.vertex = null;
-            this.vertex = HadoopPools.getGryoPool().doWithReader(gryoReader -> {
-                try {
-                    final ByteArrayInputStream inputStream = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
-                    return gryoReader.readObject(inputStream, StarGraph.class).getStarVertex(); // read the star graph
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-            });
-        } catch (final IllegalStateException e) {
-            if (e.getCause() instanceof IOException)
-                throw (IOException) e.getCause();
-            else
-                throw e;
-        }
+        this.vertex = null;
+        ByteArrayInputStream bais = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
+        this.vertex = ((StarGraph)KryoShimServiceLoader.readClassAndObject(bais)).getStarVertex(); // read the star graph;
     }
 
     @Override
     public void write(final DataOutput output) throws IOException {
-        try {
-            HadoopPools.getGryoPool().doWithWriter(gryoWriter -> {
-                try {
-                    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-                    gryoWriter.writeObject(outputStream, this.vertex.graph()); // write the star graph
-                    WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-            });
-        } catch (final IllegalStateException e) {
-            if (e.getCause() instanceof IOException)
-                throw (IOException) e.getCause();
-            else
-                throw e;
-        }
+        byte serialized[] = KryoShimServiceLoader.writeClassAndObjectToBytes(this.vertex.graph());
+        WritableUtils.writeCompressedByteArray(output, serialized);
     }
 
     private void writeObject(final ObjectOutputStream outputStream) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
new file mode 100644
index 0000000..0b27e72
--- /dev/null
+++ b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
@@ -0,0 +1 @@
+org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # HadoopPools provides/caches instances of TinkerPop's shaded Kryo