You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/08 13:33:09 UTC
[09/50] [abbrv] incubator-tinkerpop git commit: Introduce Kryo shim
to support serializer reuse
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
new file mode 100644
index 0000000..d5ba90d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.structure.util.star;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
+
+public class StarGraphSerializer implements SerializerShim<StarGraph> {
+
+ private final Direction edgeDirectionToSerialize;
+ private GraphFilter graphFilter;
+
+ private final static byte VERSION_1 = Byte.MIN_VALUE;
+
+ public StarGraphSerializer(final Direction edgeDirectionToSerialize, final GraphFilter graphFilter) {
+ this.edgeDirectionToSerialize = edgeDirectionToSerialize;
+ this.graphFilter = graphFilter;
+ }
+
+ @Override
+ public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final StarGraph starGraph) {
+ output.writeByte(VERSION_1);
+ kryo.writeObjectOrNull(output, starGraph.edgeProperties, HashMap.class);
+ kryo.writeObjectOrNull(output, starGraph.metaProperties, HashMap.class);
+ kryo.writeClassAndObject(output, starGraph.starVertex.id);
+ kryo.writeObject(output, starGraph.starVertex.label);
+ writeEdges(kryo, output, starGraph, Direction.IN);
+ writeEdges(kryo, output, starGraph, Direction.OUT);
+ kryo.writeObject(output, null != starGraph.starVertex.vertexProperties);
+ if (null != starGraph.starVertex.vertexProperties) {
+ kryo.writeObject(output, starGraph.starVertex.vertexProperties.size());
+ for (final Map.Entry<String, List<VertexProperty>> vertexProperties : starGraph.starVertex.vertexProperties.entrySet()) {
+ kryo.writeObject(output, vertexProperties.getKey());
+ kryo.writeObject(output, vertexProperties.getValue().size());
+ for (final VertexProperty vertexProperty : vertexProperties.getValue()) {
+ kryo.writeClassAndObject(output, vertexProperty.id());
+ kryo.writeClassAndObject(output, vertexProperty.value());
+ }
+ }
+ }
+ }
+
+ /**
+ * If the returned {@link StarGraph} is null, that means that the {@link GraphFilter} filtered the vertex.
+ */
+ @Override
+ public <I extends InputShim> StarGraph read(KryoShim<I, ?> kryo, I input, Class<StarGraph> clazz) {
+ final StarGraph starGraph = StarGraph.open();
+ input.readByte(); // version field ignored for now - for future use with backward compatibility
+ starGraph.edgeProperties = kryo.readObjectOrNull(input, HashMap.class);
+ starGraph.metaProperties = kryo.readObjectOrNull(input, HashMap.class);
+ starGraph.addVertex(T.id, kryo.readClassAndObject(input), T.label, kryo.readObject(input, String.class));
+ readEdges(kryo, input, starGraph, Direction.IN);
+ readEdges(kryo, input, starGraph, Direction.OUT);
+ if (kryo.readObject(input, Boolean.class)) {
+ final int numberOfUniqueKeys = kryo.readObject(input, Integer.class);
+ for (int i = 0; i < numberOfUniqueKeys; i++) {
+ final String vertexPropertyKey = kryo.readObject(input, String.class);
+ final int numberOfVertexPropertiesWithKey = kryo.readObject(input, Integer.class);
+ for (int j = 0; j < numberOfVertexPropertiesWithKey; j++) {
+ final Object id = kryo.readClassAndObject(input);
+ final Object value = kryo.readClassAndObject(input);
+ starGraph.starVertex.property(VertexProperty.Cardinality.list, vertexPropertyKey, value, T.id, id);
+ }
+ }
+ }
+ return this.graphFilter.hasFilter() ? starGraph.applyGraphFilter(this.graphFilter).orElse(null) : starGraph;
+ }
+
+ private <O extends OutputShim> void writeEdges(final KryoShim<?, O> kryo, final O output, final StarGraph starGraph, final Direction direction) {
+ // only write edges if there are some AND if the user requested them to be serialized AND if they match
+ // the direction being serialized by the format
+ final Map<String, List<Edge>> starEdges = direction.equals(Direction.OUT) ? starGraph.starVertex.outEdges : starGraph.starVertex.inEdges;
+ final boolean writeEdges = null != starEdges && edgeDirectionToSerialize != null
+ && (edgeDirectionToSerialize == direction || edgeDirectionToSerialize == Direction.BOTH);
+ kryo.writeObject(output, writeEdges);
+ if (writeEdges) {
+ kryo.writeObject(output, starEdges.size());
+ for (final Map.Entry<String, List<Edge>> edges : starEdges.entrySet()) {
+ kryo.writeObject(output, edges.getKey());
+ kryo.writeObject(output, edges.getValue().size());
+ for (final Edge edge : edges.getValue()) {
+ kryo.writeClassAndObject(output, edge.id());
+ kryo.writeClassAndObject(output, direction.equals(Direction.OUT) ? edge.inVertex().id() : edge.outVertex().id());
+ }
+ }
+ }
+ }
+
+ private <I extends InputShim> void readEdges(final KryoShim<I, ?> kryo, final I input, final StarGraph starGraph, final Direction direction) {
+ if (kryo.readObject(input, Boolean.class)) {
+ final int numberOfUniqueLabels = kryo.readObject(input, Integer.class);
+ for (int i = 0; i < numberOfUniqueLabels; i++) {
+ final String edgeLabel = kryo.readObject(input, String.class);
+ final int numberOfEdgesWithLabel = kryo.readObject(input, Integer.class);
+ for (int j = 0; j < numberOfEdgesWithLabel; j++) {
+ final Object edgeId = kryo.readClassAndObject(input);
+ final Object adjacentVertexId = kryo.readClassAndObject(input);
+ if (this.graphFilter.checkEdgeLegality(direction, edgeLabel).positive()) {
+ if (direction.equals(Direction.OUT))
+ starGraph.starVertex.addOutEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
+ else
+ starGraph.starVertex.addInEdge(edgeLabel, starGraph.addVertex(T.id, adjacentVertexId), T.id, edgeId);
+ } else if (null != starGraph.edgeProperties) {
+ starGraph.edgeProperties.remove(edgeId);
+ }
+ }
+ }
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
new file mode 100644
index 0000000..2053280
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService;
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class HadoopPoolShimService implements KryoShimService {
+
+ public Object readClassAndObject(InputStream source) {
+
+ Kryo k = null;
+
+ try {
+ k = HadoopPools.getGryoPool().takeKryo();
+
+ return k.readClassAndObject(new Input(source));
+ } finally {
+ if (null != k) {
+ HadoopPools.getGryoPool().offerKryo(k);
+ }
+ }
+ }
+
+ public void writeClassAndObject(Object o, OutputStream sink) {
+
+ Kryo k = null;
+
+ try {
+ k = HadoopPools.getGryoPool().takeKryo();
+
+ Output output = new Output(sink);
+
+ k.writeClassAndObject(output, o);
+
+ output.flush();
+ } finally {
+ if (null != k) {
+ HadoopPools.getGryoPool().offerKryo(k);
+ }
+ }
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index f09f703..5074ad5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -43,6 +43,7 @@ public final class HadoopPools {
GRYO_POOL = GryoPool.build().
poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+ initializeMapper(m -> m.registrationRequired(false)).
create();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
index 9a07f75..e7a38a5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
@@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -63,29 +65,14 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
@Override
public void readFields(final DataInput input) throws IOException {
- this.t = HadoopPools.getGryoPool().doWithReader(gryoReader -> {
- try {
- // class argument is Object because gryo doesn't really care that we don't know the specific type.
- // the type is embedded in the stream so it can just read it from there and return it as needed.
- // presumably that will cast nicely to T
- return (T) gryoReader.readObject(new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)), Object.class);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- });
+ ByteArrayInputStream bais = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
+ this.t = KryoShimServiceLoader.readClassAndObject(bais);
}
@Override
public void write(final DataOutput output) throws IOException {
- HadoopPools.getGryoPool().doWithWriter(gryoWriter -> {
- try {
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- gryoWriter.writeObject(outputStream, this.t);
- WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- });
+ byte serialized[] = KryoShimServiceLoader.writeClassAndObjectToBytes(this.t);
+ WritableUtils.writeCompressedByteArray(output, serialized);
}
private void writeObject(final ObjectOutputStream outputStream) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index ac360e9..7ac8e8c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
@@ -60,42 +61,15 @@ public final class VertexWritable implements Writable, Serializable {
@Override
public void readFields(final DataInput input) throws IOException {
- try {
- this.vertex = null;
- this.vertex = HadoopPools.getGryoPool().doWithReader(gryoReader -> {
- try {
- final ByteArrayInputStream inputStream = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
- return gryoReader.readObject(inputStream, StarGraph.class).getStarVertex(); // read the star graph
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- });
- } catch (final IllegalStateException e) {
- if (e.getCause() instanceof IOException)
- throw (IOException) e.getCause();
- else
- throw e;
- }
+ this.vertex = null;
+ ByteArrayInputStream bais = new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input));
+ this.vertex = ((StarGraph)KryoShimServiceLoader.readClassAndObject(bais)).getStarVertex(); // read the star graph;
}
@Override
public void write(final DataOutput output) throws IOException {
- try {
- HadoopPools.getGryoPool().doWithWriter(gryoWriter -> {
- try {
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- gryoWriter.writeObject(outputStream, this.vertex.graph()); // write the star graph
- WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- });
- } catch (final IllegalStateException e) {
- if (e.getCause() instanceof IOException)
- throw (IOException) e.getCause();
- else
- throw e;
- }
+ byte serialized[] = KryoShimServiceLoader.writeClassAndObjectToBytes(this.vertex.graph());
+ WritableUtils.writeCompressedByteArray(output, serialized);
}
private void writeObject(final ObjectOutputStream outputStream) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ef528697/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
new file mode 100644
index 0000000..0b27e72
--- /dev/null
+++ b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
@@ -0,0 +1 @@
+org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # HadoopPools provides/caches instances of TinkerPop's shaded Kryo