You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/07 13:14:49 UTC
[06/18] incubator-tinkerpop git commit: Kryo shim configuration tweaks
Kryo shim configuration tweaks
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9321a3e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9321a3e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9321a3e1
Branch: refs/heads/master
Commit: 9321a3e14eab4ed05f8ef5f4c77de481a4011b81
Parents: 218d790
Author: Dan LaRocque <da...@hopcount.org>
Authored: Mon Jun 6 02:24:12 2016 -0400
Committer: Dan LaRocque <da...@hopcount.org>
Committed: Mon Jun 6 03:10:03 2016 -0400
----------------------------------------------------------------------
.../process/computer/GiraphWorkerContext.java | 3 +-
.../gremlin/structure/io/gryo/GryoMapper.java | 30 ++-
.../gremlin/structure/io/gryo/GryoPool.java | 1 +
.../structure/io/gryo/GryoSerializers.java | 40 ++--
.../structure/io/gryo/JavaTimeSerializers.java | 127 +++++-------
.../structure/io/gryo/PairSerializer.java | 11 +-
.../structure/io/gryo/TypeRegistration.java | 12 ++
.../io/gryo/kryoshim/KryoShimService.java | 16 ++
.../io/gryo/kryoshim/KryoShimServiceLoader.java | 23 ++-
.../io/gryo/kryoshim/SerializerShim.java | 2 +-
.../hadoop/process/computer/HadoopCombine.java | 3 +-
.../hadoop/process/computer/HadoopMap.java | 3 +-
.../hadoop/process/computer/HadoopReduce.java | 3 +-
.../structure/io/HadoopPoolShimService.java | 7 +
.../structure/io/HadoopPoolsConfigurable.java | 4 +-
.../structure/io/gryo/GryoRecordReader.java | 3 +-
.../structure/io/gryo/GryoRecordWriter.java | 4 +-
.../spark/process/computer/SparkExecutor.java | 3 +-
.../structure/io/TinkerPopKryoRegistrator.java | 121 ------------
.../spark/structure/io/gryo/GryoSerializer.java | 2 +-
.../io/gryo/IoRegistryAwareKryoSerializer.java | 116 +++++++++++
.../io/gryo/TinkerPopKryoRegistrator.java | 194 +++++++++++++++++++
.../unshaded/UnshadedKryoShimService.java | 131 ++++++++-----
...n.structure.io.gryo.kryoshim.KryoShimService | 1 +
.../spark/structure/io/ToyGraphInputRDD.java | 3 +-
25 files changed, 572 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
index 86b733c..0122ab4 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import java.util.Iterator;
@@ -45,7 +46,7 @@ public final class GiraphWorkerContext extends WorkerContext {
public void preApplication() throws InstantiationException, IllegalAccessException {
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration());
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1));
this.memory = new GiraphMemory(this, vertexProgram);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 851b03c..41ca44d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo;
+import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
@@ -369,6 +370,20 @@ public final class GryoMapper implements Mapper<Kryo> {
private Supplier<ClassResolver> classResolver = GryoClassResolver::new;
private Builder() {
+ // Validate the default registrations
+ // For justification of these default registration rules, see TinkerPopKryoRegistrator
+ for (TypeRegistration<?> tr : typeRegistrations) {
+ if (tr.hasSerializer() /* no serializer is acceptable */ &&
+ null == tr.getSerializerShim() /* a shim serializer is acceptable */ &&
+ !(tr.getShadedSerializer() instanceof JavaSerializer) /* shaded JavaSerializer is acceptable */) {
+ // everything else is invalid
+ String msg = String.format("The default GryoMapper type registration %s is invalid. " +
+ "It must supply either an implementation of %s or %s, but supplies neither. " +
+ "This is probably a bug in GryoMapper's default serialization class registrations.", tr,
+ SerializerShim.class.getCanonicalName(), JavaSerializer.class.getCanonicalName());
+ throw new IllegalStateException(msg);
+ }
+ }
}
/**
@@ -538,8 +553,8 @@ public final class GryoMapper implements Mapper<Kryo> {
if (1 < serializerCount) {
String msg = String.format(
"GryoTypeReg accepts at most one kind of serializer, but multiple " +
- "serializers were supplied for class %s (id %s). " +
- "Shaded serializer: %s. Shim serializer: %s. Shaded serializer function: %s.",
+ "serializers were supplied for class %s (id %s). " +
+ "Shaded serializer: %s. Shim serializer: %s. Shaded serializer function: %s.",
this.clazz.getCanonicalName(), id,
this.shadedSerializer, this.serializerShim, this.functionOfShadedKryo);
throw new IllegalArgumentException(msg);
@@ -603,5 +618,16 @@ public final class GryoMapper implements Mapper<Kryo> {
return kryo;
}
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("targetClass", clazz)
+ .append("id", id)
+ .append("shadedSerializer", shadedSerializer)
+ .append("serializerShim", serializerShim)
+ .append("functionOfShadedKryo", functionOfShadedKryo)
+ .toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index e7bf636..59f8a5d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -40,6 +40,7 @@ import java.util.function.Function;
public final class GryoPool {
public static final String CONFIG_IO_REGISTRY = "gremlin.io.registry";
public static final String CONFIG_IO_GRYO_POOL_SIZE = "gremlin.io.gryo.poolSize";
+ public static final int CONFIG_IO_GRYO_POOL_SIZE_DEFAULT = 256;
public enum Type {READER, WRITER, READER_WRITER}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
index ae99ac6..16fbe85 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
@@ -23,16 +23,16 @@ import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPath;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
/**
* Class used to serialize graph-based objects such as vertices, edges, properties, and paths. These objects are
@@ -42,19 +42,19 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
* @author Stephen Mallette (http://stephen.genoprime.com)
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-final class GryoSerializers {
+public final class GryoSerializers {
/**
* Serializes any {@link Edge} implementation encountered to a {@link DetachedEdge}.
*/
- final static class EdgeSerializer extends Serializer<Edge> {
+ final static class EdgeSerializer implements SerializerShim<Edge> {
@Override
- public void write(final Kryo kryo, final Output output, final Edge edge) {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Edge edge) {
kryo.writeClassAndObject(output, DetachedFactory.detach(edge, true));
}
@Override
- public Edge read(final Kryo kryo, final Input input, final Class<Edge> edgeClass) {
+ public <I extends InputShim> Edge read(KryoShim<I, ?> kryo, I input, Class<Edge> edgeClass) {
final Object o = kryo.readClassAndObject(input);
return (Edge) o;
}
@@ -63,14 +63,14 @@ final class GryoSerializers {
/**
* Serializes any {@link Vertex} implementation encountered to an {@link DetachedVertex}.
*/
- final static class VertexSerializer extends Serializer<Vertex> {
+ final static class VertexSerializer implements SerializerShim<Vertex> {
@Override
- public void write(final Kryo kryo, final Output output, final Vertex vertex) {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Vertex vertex) {
kryo.writeClassAndObject(output, DetachedFactory.detach(vertex, true));
}
@Override
- public Vertex read(final Kryo kryo, final Input input, final Class<Vertex> vertexClass) {
+ public <I extends InputShim> Vertex read(KryoShim<I, ?> kryo, I input, Class<Vertex> vertexClass) {
return (Vertex) kryo.readClassAndObject(input);
}
}
@@ -78,14 +78,14 @@ final class GryoSerializers {
/**
* Serializes any {@link Property} implementation encountered to an {@link DetachedProperty}.
*/
- final static class PropertySerializer extends Serializer<Property> {
+ final static class PropertySerializer implements SerializerShim<Property> {
@Override
- public void write(final Kryo kryo, final Output output, final Property property) {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Property property) {
kryo.writeClassAndObject(output, property instanceof VertexProperty ? DetachedFactory.detach((VertexProperty) property, true) : DetachedFactory.detach(property));
}
@Override
- public Property read(final Kryo kryo, final Input input, final Class<Property> propertyClass) {
+ public <I extends InputShim> Property read(KryoShim<I, ?> kryo, I input, Class<Property> propertyClass) {
return (Property) kryo.readClassAndObject(input);
}
}
@@ -93,14 +93,14 @@ final class GryoSerializers {
/**
* Serializes any {@link VertexProperty} implementation encountered to an {@link DetachedVertexProperty}.
*/
- final static class VertexPropertySerializer extends Serializer<VertexProperty> {
+ final static class VertexPropertySerializer implements SerializerShim<VertexProperty> {
@Override
- public void write(final Kryo kryo, final Output output, final VertexProperty vertexProperty) {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, VertexProperty vertexProperty) {
kryo.writeClassAndObject(output, DetachedFactory.detach(vertexProperty, true));
}
@Override
- public VertexProperty read(final Kryo kryo, final Input input, final Class<VertexProperty> vertexPropertyClass) {
+ public <I extends InputShim> VertexProperty read(KryoShim<I, ?> kryo, I input, Class<VertexProperty> vertexPropertyClass) {
return (VertexProperty) kryo.readClassAndObject(input);
}
}
@@ -108,14 +108,14 @@ final class GryoSerializers {
/**
* Serializes any {@link Path} implementation encountered to an {@link DetachedPath}.
*/
- final static class PathSerializer extends Serializer<Path> {
+ public final static class PathSerializer implements SerializerShim<Path> {
@Override
- public void write(final Kryo kryo, final Output output, final Path path) {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Path path) {
kryo.writeClassAndObject(output, DetachedFactory.detach(path, false));
}
@Override
- public Path read(final Kryo kryo, final Input input, final Class<Path> pathClass) {
+ public <I extends InputShim> Path read(KryoShim<I, ?> kryo, I input, Class<Path> pathClass) {
return (Path) kryo.readClassAndObject(input);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
index 1d4e236..8b14345 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
@@ -18,10 +18,10 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
import java.time.Duration;
import java.time.Instant;
@@ -48,17 +48,14 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link Duration} class.
*/
- final static class DurationSerializer extends Serializer<Duration>
- {
+ final static class DurationSerializer implements SerializerShim<Duration> {
@Override
- public void write(final Kryo kryo, final Output output, final Duration duration)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Duration duration) {
output.writeLong(duration.toNanos());
}
@Override
- public Duration read(final Kryo kryo, final Input input, final Class<Duration> durationClass)
- {
+ public <I extends InputShim> Duration read(KryoShim<I, ?> kryo, I input, Class<Duration> durationClass) {
return Duration.ofNanos(input.readLong());
}
}
@@ -66,18 +63,15 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link Instant} class.
*/
- final static class InstantSerializer extends Serializer<Instant>
- {
+ final static class InstantSerializer implements SerializerShim<Instant> {
@Override
- public void write(Kryo kryo, Output output, Instant instant)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Instant instant) {
output.writeLong(instant.getEpochSecond());
output.writeInt(instant.getNano());
}
@Override
- public Instant read(Kryo kryo, Input input, Class<Instant> aClass)
- {
+ public <I extends InputShim> Instant read(KryoShim<I, ?> kryo, I input, Class<Instant> aClass) {
return Instant.ofEpochSecond(input.readLong(), input.readInt());
}
}
@@ -85,17 +79,14 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link LocalDate} class.
*/
- final static class LocalDateSerializer extends Serializer<LocalDate>
- {
+ final static class LocalDateSerializer implements SerializerShim<LocalDate> {
@Override
- public void write(final Kryo kryo, final Output output, final LocalDate localDate)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalDate localDate) {
output.writeLong(localDate.toEpochDay());
}
@Override
- public LocalDate read(final Kryo kryo, final Input input, final Class<LocalDate> clazz)
- {
+ public <I extends InputShim> LocalDate read(KryoShim<I, ?> kryo, I input, Class<LocalDate> clazz) {
return LocalDate.ofEpochDay(input.readLong());
}
}
@@ -103,11 +94,9 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link LocalDateTime} class.
*/
- final static class LocalDateTimeSerializer extends Serializer<LocalDateTime>
- {
+ final static class LocalDateTimeSerializer implements SerializerShim<LocalDateTime> {
@Override
- public void write(final Kryo kryo, final Output output, final LocalDateTime localDateTime)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalDateTime localDateTime) {
output.writeInt(localDateTime.getYear());
output.writeInt(localDateTime.getMonthValue());
output.writeInt(localDateTime.getDayOfMonth());
@@ -118,8 +107,7 @@ final class JavaTimeSerializers {
}
@Override
- public LocalDateTime read(final Kryo kryo, final Input input, final Class<LocalDateTime> clazz)
- {
+ public <I extends InputShim> LocalDateTime read(KryoShim<I, ?> kryo, I input, Class<LocalDateTime> clazz) {
return LocalDateTime.of(input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt());
}
}
@@ -127,17 +115,14 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link LocalTime} class.
*/
- final static class LocalTimeSerializer extends Serializer<LocalTime>
- {
+ final static class LocalTimeSerializer implements SerializerShim<LocalTime> {
@Override
- public void write(final Kryo kryo, final Output output, final LocalTime localTime)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalTime localTime) {
output.writeLong(localTime.toNanoOfDay());
}
@Override
- public LocalTime read(final Kryo kryo, final Input input, final Class<LocalTime> clazz)
- {
+ public <I extends InputShim> LocalTime read(KryoShim<I, ?> kryo, I input, Class<LocalTime> clazz) {
return LocalTime.ofNanoOfDay(input.readLong());
}
}
@@ -145,37 +130,31 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link MonthDay} class.
*/
- final static class MonthDaySerializer extends Serializer<MonthDay>
- {
+ final static class MonthDaySerializer implements SerializerShim<MonthDay> {
@Override
- public void write(final Kryo kryo, final Output output, final MonthDay monthDay)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, MonthDay monthDay) {
output.writeInt(monthDay.getMonthValue());
output.writeInt(monthDay.getDayOfMonth());
}
@Override
- public MonthDay read(final Kryo kryo, final Input input, final Class<MonthDay> clazz)
- {
- return MonthDay.of(input.readInt(), input.readInt());
+ public <I extends InputShim> MonthDay read(KryoShim<I, ?> kryo, I input, Class<MonthDay> clazz) {
+ return null;
}
}
/**
* Serializer for the {@link OffsetDateTime} class.
*/
- final static class OffsetDateTimeSerializer extends Serializer<OffsetDateTime>
- {
+ final static class OffsetDateTimeSerializer implements SerializerShim<OffsetDateTime> {
@Override
- public void write(final Kryo kryo, final Output output, final OffsetDateTime offsetDateTime)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, OffsetDateTime offsetDateTime) {
kryo.writeObject(output, offsetDateTime.toLocalDateTime());
kryo.writeObject(output, offsetDateTime.getOffset());
}
@Override
- public OffsetDateTime read(final Kryo kryo, final Input input, final Class<OffsetDateTime> clazz)
- {
+ public <I extends InputShim> OffsetDateTime read(KryoShim<I, ?> kryo, I input, Class<OffsetDateTime> clazz) {
return OffsetDateTime.of(kryo.readObject(input, LocalDateTime.class), kryo.readObject(input, ZoneOffset.class));
}
}
@@ -183,18 +162,15 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link OffsetTime} class.
*/
- final static class OffsetTimeSerializer extends Serializer<OffsetTime>
- {
+ final static class OffsetTimeSerializer implements SerializerShim<OffsetTime> {
@Override
- public void write(final Kryo kryo, final Output output, final OffsetTime offsetTime)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, OffsetTime offsetTime) {
kryo.writeObject(output, offsetTime.toLocalTime());
kryo.writeObject(output, offsetTime.getOffset());
}
@Override
- public OffsetTime read(final Kryo kryo, final Input input, final Class<OffsetTime> clazz)
- {
+ public <I extends InputShim> OffsetTime read(KryoShim<I, ?> kryo, I input, Class<OffsetTime> clazz) {
return OffsetTime.of(kryo.readObject(input, LocalTime.class), kryo.readObject(input, ZoneOffset.class));
}
}
@@ -202,19 +178,16 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link Period} class.
*/
- final static class PeriodSerializer extends Serializer<Period>
- {
+ final static class PeriodSerializer implements SerializerShim<Period> {
@Override
- public void write(final Kryo kryo, final Output output, final Period period)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Period period) {
output.writeInt(period.getYears());
output.writeInt(period.getMonths());
output.writeInt(period.getDays());
}
@Override
- public Period read(final Kryo kryo, final Input input, final Class<Period> clazz)
- {
+ public <I extends InputShim> Period read(KryoShim<I, ?> kryo, I input, Class<Period> clazz) {
return Period.of(input.readInt(), input.readInt(), input.readInt());
}
}
@@ -222,17 +195,14 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link Year} class.
*/
- final static class YearSerializer extends Serializer<Year>
- {
+ final static class YearSerializer implements SerializerShim<Year> {
@Override
- public void write(final Kryo kryo, final Output output, final Year year)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Year year) {
output.writeInt(year.getValue());
}
@Override
- public Year read(final Kryo kryo, final Input input, final Class<Year> clazz)
- {
+ public <I extends InputShim> Year read(KryoShim<I, ?> kryo, I input, Class<Year> clazz) {
return Year.of(input.readInt());
}
}
@@ -240,18 +210,15 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link YearMonth} class.
*/
- final static class YearMonthSerializer extends Serializer<YearMonth>
- {
+ final static class YearMonthSerializer implements SerializerShim<YearMonth> {
@Override
- public void write(final Kryo kryo, final Output output, final YearMonth monthDay)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, YearMonth monthDay) {
output.writeInt(monthDay.getYear());
output.writeInt(monthDay.getMonthValue());
}
@Override
- public YearMonth read(final Kryo kryo, final Input input, final Class<YearMonth> clazz)
- {
+ public <I extends InputShim> YearMonth read(KryoShim<I, ?> kryo, I input, Class<YearMonth> clazz) {
return YearMonth.of(input.readInt(), input.readInt());
}
}
@@ -259,11 +226,9 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link ZonedDateTime} class.
*/
- final static class ZonedDateTimeSerializer extends Serializer<ZonedDateTime>
- {
+ final static class ZonedDateTimeSerializer implements SerializerShim<ZonedDateTime> {
@Override
- public void write(final Kryo kryo, final Output output, final ZonedDateTime zonedDateTime)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, ZonedDateTime zonedDateTime) {
output.writeInt(zonedDateTime.getYear());
output.writeInt(zonedDateTime.getMonthValue());
output.writeInt(zonedDateTime.getDayOfMonth());
@@ -275,8 +240,7 @@ final class JavaTimeSerializers {
}
@Override
- public ZonedDateTime read(final Kryo kryo, final Input input, final Class<ZonedDateTime> clazz)
- {
+ public <I extends InputShim> ZonedDateTime read(KryoShim<I, ?> kryo, I input, Class<ZonedDateTime> clazz) {
return ZonedDateTime.of(input.readInt(), input.readInt(), input.readInt(),
input.readInt(), input.readInt(), input.readInt(), input.readInt(),
ZoneId.of(input.readString()));
@@ -286,17 +250,14 @@ final class JavaTimeSerializers {
/**
* Serializer for the {@link ZoneOffset} class.
*/
- final static class ZoneOffsetSerializer extends Serializer<ZoneOffset>
- {
+ final static class ZoneOffsetSerializer implements SerializerShim<ZoneOffset> {
@Override
- public void write(final Kryo kryo, final Output output, final ZoneOffset zoneOffset)
- {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, ZoneOffset zoneOffset) {
output.writeString(zoneOffset.getId());
}
@Override
- public ZoneOffset read(final Kryo kryo, final Input input, final Class<ZoneOffset> clazz)
- {
+ public <I extends InputShim> ZoneOffset read(KryoShim<I, ?> kryo, I input, Class<ZoneOffset> clazz) {
return ZoneOffset.of(input.readString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
index e5e92e7..0464b22 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
@@ -18,6 +18,10 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
import org.apache.tinkerpop.shaded.kryo.Kryo;
import org.apache.tinkerpop.shaded.kryo.Serializer;
import org.apache.tinkerpop.shaded.kryo.io.Input;
@@ -27,16 +31,15 @@ import org.javatuples.Pair;
/**
* @author Daniel Kuppitz (http://gremlin.guru)
*/
-final class PairSerializer extends Serializer<Pair> {
-
+final class PairSerializer implements SerializerShim<Pair> {
@Override
- public void write(final Kryo kryo, final Output output, final Pair pair) {
+ public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Pair pair) {
kryo.writeClassAndObject(output, pair.getValue0());
kryo.writeClassAndObject(output, pair.getValue1());
}
@Override
- public Pair read(final Kryo kryo, final Input input, final Class<Pair> pairClass) {
+ public <I extends InputShim> Pair read(KryoShim<I, ?> kryo, I input, Class<Pair> pairClass) {
return Pair.with(kryo.readClassAndObject(input), kryo.readClassAndObject(input));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
index ef105ce..1f41c0d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
@@ -67,4 +67,16 @@ public interface TypeRegistration<T> {
* @return the sole parameter
*/
Kryo registerWith(Kryo kryo);
+
+ /**
+ * Returns true if at least one of {@link #getShadedSerializer()}, {@link #getSerializerShim()}, or
+ * {@link #getFunctionOfShadedKryo()} is non null. Returns false if all are null.
+ *
+ * @return whether a serializer is defined for this type registration
+ */
+ default boolean hasSerializer() {
+ return null != getFunctionOfShadedKryo() ||
+ null != getSerializerShim() ||
+ null != getShadedSerializer();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
index 959605c..7783856 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
@@ -18,6 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
+import org.apache.commons.configuration.Configuration;
+
import java.io.InputStream;
import java.io.OutputStream;
@@ -80,4 +82,18 @@ public interface KryoShimService {
* @return this implementation's priority value
*/
int getPriority();
+
+ /**
+ * Attempt to incorporate the supplied configuration in future read/write calls.
+ * <p>
+ * This method is a wart that exists essentially just to support the old
+ * {@link HadoopPools#initialize(Configuration)} use-case.
+ * <p>
+ * This method is not guaranteed to have any effect on an instance of this interface
+ * after {@link #writeClassAndObject(Object, OutputStream)} or {@link #readClassAndObject(InputStream)}
+ * has been invoked on that particular instance.
+ *
+ * @param conf the configuration to apply to this service's internal serializer
+ */
+ void applyConfiguration(Configuration conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 9ccf2de..9184dd0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.shaded.kryo.io.Input;
import org.apache.tinkerpop.shaded.kryo.io.Output;
import org.slf4j.Logger;
@@ -35,7 +36,9 @@ import java.util.ServiceLoader;
*/
public class KryoShimServiceLoader {
- private static volatile KryoShimService CACHED_SHIM_SERVICE;
+ private static volatile KryoShimService cachedShimService;
+
+ private static volatile Configuration conf;
private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class);
@@ -46,6 +49,10 @@ public class KryoShimServiceLoader {
*/
public static final String SHIM_CLASS_SYSTEM_PROPERTY = "tinkerpop.kryo.shim";
+ public static void applyConfiguration(Configuration conf) {
+ KryoShimServiceLoader.conf = conf;
+ }
+
/**
* Return a reference to the shim service. This method may return a cached shim service
* unless {@code forceReload} is true. Calls to this method need not be externally
@@ -58,8 +65,8 @@ public class KryoShimServiceLoader {
*/
public static KryoShimService load(boolean forceReload) {
- if (null != CACHED_SHIM_SERVICE && !forceReload) {
- return CACHED_SHIM_SERVICE;
+ if (null != cachedShimService && !forceReload) {
+ return cachedShimService;
}
ArrayList<KryoShimService> services = new ArrayList<>();
@@ -109,7 +116,15 @@ public class KryoShimServiceLoader {
log.info("Set {} provider to {} ({}) because its priority value ({}) is the highest available",
KryoShimService.class.getSimpleName(), result, result.getClass(), result.getPriority());
- return CACHED_SHIM_SERVICE = result;
+ Configuration userConf = conf;
+
+ if (null != userConf) {
+ log.info("Configuring {} provider {} with user-provided configuration",
+ KryoShimService.class.getSimpleName(), result);
+ result.applyConfiguration(userConf);
+ }
+
+ return cachedShimService = result;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
index 191cdd8..e5f9005 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
@@ -26,7 +26,7 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
*/
public interface SerializerShim<T> {
- <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, T starGraph);
+ <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, T object);
<I extends InputShim> T read(KryoShim<I, ?> kryo, I input, Class<T> clazz);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
index de1e2f9..06778e6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public final class HadoopCombine extends Reducer<ObjectWritable, ObjectWritable,
@Override
public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration());
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
this.mapReduce.workerStart(MapReduce.Stage.COMBINE);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
index 9e6fac3..5fc7026 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ public final class HadoopMap extends Mapper<NullWritable, VertexWritable, Object
@Override
public void setup(final Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context) {
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration());
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
this.mapReduce.workerStart(MapReduce.Stage.MAP);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
index 06dfba1..6ca7b8f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public final class HadoopReduce extends Reducer<ObjectWritable, ObjectWritable,
@Override
public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration());
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
this.mapReduce.workerStart(MapReduce.Stage.REDUCE);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index c19b914..5753d90 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -18,7 +18,9 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.shaded.kryo.Kryo;
import org.apache.tinkerpop.shaded.kryo.io.Input;
import org.apache.tinkerpop.shaded.kryo.io.Output;
@@ -66,4 +68,9 @@ public class HadoopPoolShimService implements KryoShimService {
public int getPriority() {
return 0;
}
+
+ @Override
+ public void applyConfiguration(Configuration conf) {
+ KryoShimServiceLoader.applyConfiguration(conf);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
index f3a1bac..0e5f135 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -28,7 +30,7 @@ public interface HadoopPoolsConfigurable extends Configurable {
@Override
public default void setConf(final Configuration configuration) {
- HadoopPools.initialize(configuration);
+ KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index d7ed46b..a1daddf 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -71,7 +72,7 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
final Configuration configuration = context.getConfiguration();
if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
- HadoopPools.initialize(configuration);
+ KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
this.gryoReader = HadoopPools.getGryoPool().takeReader();
long start = split.getStart();
final Path file = split.getPath();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
index 67a8339..2ea3394 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -43,7 +45,7 @@ public final class GryoRecordWriter extends RecordWriter<NullWritable, VertexWri
public GryoRecordWriter(final DataOutputStream outputStream, final Configuration configuration) {
this.outputStream = outputStream;
this.hasEdges = configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true);
- HadoopPools.initialize(configuration);
+ KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
this.gryoWriter = HadoopPools.getGryoPool().takeWriter();
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index c2b85dd..9e5ac53 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -38,6 +38,7 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -89,7 +90,7 @@ public final class SparkExecutor {
graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices emit a view and their outgoing messages
.mapPartitionsToPair(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
final SparkMessenger<M> messenger = new SparkMessenger<>();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
deleted file mode 100644
index 4c99e70..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
-import org.apache.spark.serializer.KryoRegistrator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalExplanation;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.ObjectWritableSerializer;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A spark.kryo.registrator implementation that installs TinkerPop types.
- * This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer.
- */
-public class TinkerPopKryoRegistrator implements KryoRegistrator {
-
- private static final Logger log = LoggerFactory.getLogger(TinkerPopKryoRegistrator.class);
-
- @Override
- public void registerClasses(Kryo kryo) {
- // TinkerPop type registrations copied from GyroSerializer's constructor
- kryo.register(MessagePayload.class);
- kryo.register(ViewIncomingPayload.class);
- kryo.register(ViewOutgoingPayload.class);
- kryo.register(ViewPayload.class);
- kryo.register(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer()));
- kryo.register(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>()));
-
- Set<Class<?>> shimmedClasses = new HashSet<>();
-
- Set<Class<?>> javaSerializationClasses = new HashSet<>();
-
- // Copy GryoMapper's default registrations
- for (TypeRegistration<?> tr : GryoMapper.build().create().getTypeRegistrations()) {
- // Special case for JavaSerializer, which is generally implemented in terms of TinkerPop's
- // problematic static GryoMapper/GryoSerializer pool (these are handled below the loop)
- org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer();
- SerializerShim<?> serializerShim = tr.getSerializerShim();
- if (null != shadedSerializer &&
- shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) {
- javaSerializationClasses.add(tr.getTargetClass());
- } else if (null != serializerShim) {
- log.debug("Registering class {} to serializer shim {} (serializer shim class {})",
- tr.getTargetClass(), serializerShim, serializerShim.getClass());
- kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim));
- shimmedClasses.add(tr.getTargetClass());
- } else {
- // Register with the default behavior (FieldSerializer)
- log.debug("Registering class {} with default serializer", tr.getTargetClass());
- kryo.register(tr.getTargetClass());
- }
- }
-
- Map<Class<?>, Serializer<?>> javaSerializerReplacements = new HashMap<>();
- javaSerializerReplacements.put(GroupStep.GroupBiOperator.class, new JavaSerializer());
- javaSerializerReplacements.put(OrderGlobalStep.OrderBiOperator.class, null);
- javaSerializerReplacements.put(TraversalExplanation.class, null);
-
- for (Map.Entry<Class<?>, Serializer<?>> e : javaSerializerReplacements.entrySet()) {
- Class<?> c = e.getKey();
- Serializer<?> s = e.getValue();
-
- if (javaSerializationClasses.remove(c)) {
- if (null != s) {
- log.debug("Registering class {} with serializer {}", c, s);
- kryo.register(c, s);
- } else {
- log.debug("Registering class {} with default serializer", c);
- kryo.register(c);
- }
- } else {
- log.debug("Registering class {} with JavaSerializer", c);
- kryo.register(c, new JavaSerializer());
- }
- }
-
- // We really care about StarGraph's shim serializer, so make sure we registered it
- if (!shimmedClasses.contains(StarGraph.class)) {
- log.warn("No SerializerShim found for StarGraph");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 2c1dfa2..28a4d55 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -78,7 +78,7 @@ public final class GryoSerializer extends Serializer {
}
}
this.gryoPool = GryoPool.build().
- poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
+ poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
initializeMapper(builder -> {
try {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
new file mode 100644
index 0000000..8b21e21
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copyright DataStax, Inc.
+ * <p>
+ * Please see the included license file for details.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}.
+ */
+public class IoRegistryAwareKryoSerializer extends KryoSerializer {
+
+ private final SparkConf conf;
+
+ private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class);
+
+ public IoRegistryAwareKryoSerializer(SparkConf conf) {
+ super(conf);
+ // store conf so that we can access its registry (if one is present) in newKryo()
+ this.conf = conf;
+ }
+
+ @Override
+ public Kryo newKryo() {
+ Kryo kryo = super.newKryo();
+
+ return applyIoRegistryIfPresent(kryo);
+ }
+
+ private Kryo applyIoRegistryIfPresent(Kryo kryo) {
+ if (!conf.contains(GryoPool.CONFIG_IO_REGISTRY)) {
+ log.info("SparkConf {} does not contain setting {}, skipping {} handling",
+ GryoPool.CONFIG_IO_REGISTRY, conf, IoRegistry.class.getCanonicalName());
+ return kryo;
+ }
+
+ String registryClassnames = conf.get(GryoPool.CONFIG_IO_REGISTRY);
+
+ for (String registryClassname : registryClassnames.split(",")) {
+ final IoRegistry registry;
+
+ try {
+ registry = (IoRegistry) Class.forName(registryClassname).newInstance();
+ log.info("Instantiated {}", registryClassname);
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ log.error("Unable to reflectively instantiate the {} implementation named {}",
+ IoRegistry.class.getCanonicalName(), registryClassname, e);
+ return kryo;
+ }
+
+ // Left is the class targeted for serialization, right is a mess of potential types, including
+ // a shaded Serializer impl, unshaded Serializer impl, or Function<shaded.Kryo,shaded.Serializer>
+ final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class);
+
+ if (null == serializers) {
+ log.info("Invoking find({}.class) returned null on registry {}; ignoring this registry",
+ GryoIo.class.getCanonicalName(), registry);
+ return kryo;
+ }
+
+ for (Pair<Class, Object> p : serializers) {
+ if (null == p.getValue1()) {
+ // null on the right is fine
+ log.info("Registering {} with default serializer", p.getValue0());
+ kryo.register(p.getValue0());
+ } else if (p.getValue1() instanceof Serializer) {
+ // unshaded serializer on the right is fine
+ log.info("Registering {} with serializer {}", p.getValue0(), p.getValue1());
+ kryo.register(p.getValue0(), (Serializer) p.getValue1());
+ } else {
+ // anything else on the right is unsupported with Spark
+ log.error("Serializer {} found in {} must implement {} " +
+ "(the shaded interface {} is not supported on Spark). This class will be registered with " +
+ "the default behavior of Spark's KryoSerializer.",
+ p.getValue1(), registryClassname, Serializer.class.getCanonicalName(),
+ org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+ kryo.register(p.getValue0());
+ }
+ }
+ }
+
+ return kryo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java
new file mode 100644
index 0000000..bdb80fd
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import org.apache.spark.serializer.KryoRegistrator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * A spark.kryo.registrator implementation that installs TinkerPop types.
+ * This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer.
+ */
+public class TinkerPopKryoRegistrator implements KryoRegistrator {
+
+ private static final Logger log = LoggerFactory.getLogger(TinkerPopKryoRegistrator.class);
+
+ @Override
+ public void registerClasses(Kryo kryo) {
+ registerClasses(kryo, Collections.emptyMap(), Collections.emptySet());
+ }
+
+ /**
+ * Register TinkerPop's classes with the supplied {@link Kryo} instance
+ * while honoring optional overrides and optional class blacklist ("blackset"?).
+ *
+ * @param kryo the Kryo serializer instance with which to register types
+ * @param serializerOverrides serializer mappings that override this class's defaults
+ * @param blacklist classes which should not be registered at all, even if there is an override entry
+ * or if they would be registered by this class by default (does not affect Kryo's
+ * built-in registrations, e.g. String.class).
+ */
+ public void registerClasses(Kryo kryo, Map<Class<?>, Serializer<?>> serializerOverrides, Set<Class<?>> blacklist) {
+ // Apply TinkerPop type registrations copied from GyroSerializer's constructor
+ for (Map.Entry<Class<?>, Serializer<?>> ent : getExtraRegistrations().entrySet()) {
+ Class<?> targetClass = ent.getKey();
+ Serializer<?> ser = ent.getValue();
+
+ // Is this class blacklisted? Skip it. (takes precedence over serializerOverrides)
+ if (blacklist.contains(targetClass)) {
+ log.debug("Not registering serializer for {} (blacklisted)", targetClass);
+ continue;
+ }
+
+ if (checkForAndApplySerializerOverride(serializerOverrides, kryo, targetClass)) {
+ // do nothing but skip the remaining else(-if) clauses
+ } else if (null == ser) {
+ log.debug("Registering {} with default serializer", targetClass);
+ kryo.register(targetClass);
+ } else {
+ log.debug("Registering {} with serializer {}", targetClass, ser);
+ kryo.register(targetClass, ser);
+ }
+ }
+
+ Set<Class<?>> shimmedClassesFromGryoMapper = new HashSet<>();
+
+ // Apply GryoMapper's default registrations
+ for (TypeRegistration<?> tr : GryoMapper.build().create().getTypeRegistrations()) {
+ // Is this class blacklisted? Skip it. (takes precedence over serializerOverrides)
+ if (blacklist.contains(tr.getTargetClass())) {
+ log.debug("Not registering serializer for {} (blacklisted)", tr.getTargetClass());
+ continue;
+ }
+
+ final org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer();
+ final SerializerShim<?> serializerShim = tr.getSerializerShim();
+ final java.util.function.Function<
+ org.apache.tinkerpop.shaded.kryo.Kryo,
+ org.apache.tinkerpop.shaded.kryo.Serializer> functionOfShadedKryo = tr.getFunctionOfShadedKryo();
+
+ // Apply overrides with the highest case-precedence
+ if (checkForAndApplySerializerOverride(serializerOverrides, kryo, tr.getTargetClass())) {
+ // do nothing but skip the remaining else(-if) clauses
+ } else if (null != shadedSerializer) {
+ if (shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) {
+ // Convert GryoMapper's shaded JavaSerializer mappings to their unshaded equivalents
+ log.debug("Registering {} with JavaSerializer", tr.getTargetClass());
+ kryo.register(tr.getTargetClass(), new JavaSerializer());
+ } else {
+ // There's supposed to be a check in GryoMapper that prevents this from happening
+ log.error("GryoMapper's default serialization registration for {} is a {}. " +
+ "This is probably a bug in TinkerPop (this is not a valid default registration). " +
+ "I am configuring Spark to use Kryo's default serializer for this class, " +
+ "but this may cause serialization failures at runtime.",
+ tr.getTargetClass(),
+ org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+ kryo.register(tr.getTargetClass());
+ }
+ } else if (null != serializerShim) {
+ // Wrap shim serializers in an adapter for Spark's unshaded Kryo
+ log.debug("Registering {} to serializer shim {} (serializer shim {})",
+ tr.getTargetClass(), serializerShim, serializerShim.getClass());
+ kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim));
+ shimmedClassesFromGryoMapper.add(tr.getTargetClass());
+ } else if (null != functionOfShadedKryo) {
+ // As with shaded serializers, there's supposed to be a check in GryoMapper that prevents this from happening
+ log.error("GryoMapper's default serialization registration for {} is a Function<{},{}>. " +
+ "This is probably a bug in TinkerPop (this is not a valid default registration). " +
+ "I am configuring Spark to use Kryo's default serializer instead of this function, " +
+ "but this may cause serialization failures at runtime.",
+ tr.getTargetClass(),
+ org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
+ org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+ kryo.register(tr.getTargetClass());
+ } else {
+ // Register all other classes with the default behavior (FieldSerializer)
+ log.debug("Registering {} with default serializer", tr.getTargetClass());
+ kryo.register(tr.getTargetClass());
+ }
+ }
+
+ // StarGraph's shim serializer is especially important on Spark for efficiency reasons,
+ // so log a warning if we failed to register it somehow
+ if (!shimmedClassesFromGryoMapper.contains(StarGraph.class)) {
+ log.warn("No SerializerShim found for StarGraph");
+ }
+ }
+
+ private LinkedHashMap<Class<?>, Serializer<?>> getExtraRegistrations() {
+
+ /* The map returned by this method MUST have a fixed iteration order!
+ *
+ * The order itself is irrelevant, so long as it is completely stable at runtime.
+ *
+ * LinkedHashMap satisfies this requirement (its contract specifies
+ * iteration in key-insertion-order).
+ */
+
+ LinkedHashMap<Class<?>, Serializer<?>> m = new LinkedHashMap<>();
+ // The following entries were copied from GryoSerializer's constructor
+ // This could be turned into a static collection on GryoSerializer to avoid
+ // duplication, but it would be a bit cumbersome to do so without disturbing
+ // the ordering of the existing entries in that constructor, since not all
+ // of the entries are for TinkerPop (and the ordering is significant).
+ m.put(MessagePayload.class, null);
+ m.put(ViewIncomingPayload.class, null);
+ m.put(ViewOutgoingPayload.class, null);
+ m.put(ViewPayload.class, null);
+ m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer()));
+ m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>()));
+
+ return m;
+ }
+
+ private boolean checkForAndApplySerializerOverride(Map<Class<?>, Serializer<?>> serializerOverrides,
+ Kryo kryo, Class<?> targetClass) {
+ if (serializerOverrides.containsKey(targetClass)) {
+ Serializer<?> ser = serializerOverrides.get(targetClass);
+ if (null == ser) {
+ // null means use Kryo's default serializer
+ log.debug("Registering {} with default serializer per overrides", targetClass);
+ kryo.register(targetClass);
+ } else {
+ // nonnull means use that serializer
+ log.debug("Registering {} with serializer {} per overrides", targetClass, ser);
+ kryo.register(targetClass, ser);
+ }
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index d0411e8..a524a97 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -24,92 +24,131 @@
*/
package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
-import com.twitter.chill.KryoInstantiator;
-import com.twitter.chill.KryoPool;
-import com.twitter.chill.SerDeState;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkConf;
-import org.apache.spark.serializer.KryoSerializer;
-import org.apache.tinkerpop.gremlin.spark.structure.io.TinkerPopKryoRegistrator;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.IoRegistryAwareKryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.concurrent.LinkedBlockingQueue;
public class UnshadedKryoShimService implements KryoShimService {
- public static final String SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY = "tinkerpop.kryo.poolsize";
-
private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class);
- private static final int SPARK_KRYO_POOL_SIZE_DEFAULT = 8;
- private final KryoSerializer sparkKryoSerializer;
- private final KryoPool kryoPool;
+ private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>();
- public UnshadedKryoShimService() {
- this(TinkerPopKryoRegistrator.class.getCanonicalName(), getDefaultKryoPoolSize());
- }
+ private static volatile boolean initialized;
- public UnshadedKryoShimService(String sparkKryoRegistratorClassname, int kryoPoolSize) {
- SparkConf sparkConf = new SparkConf();
- sparkConf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
- sparkConf.set("spark.kryo.registrator", sparkKryoRegistratorClassname);
- sparkKryoSerializer = new KryoSerializer(sparkConf);
- kryoPool = KryoPool.withByteArrayOutputStream(kryoPoolSize, new KryoInstantiator());
- }
+ public UnshadedKryoShimService() { }
@Override
public Object readClassAndObject(InputStream source) {
- SerDeState sds = null;
- try {
- sds = kryoPool.borrow();
- sds.setInput(source);
+ LinkedBlockingQueue<Kryo> kryos = initialize();
- return sds.readClassAndObject();
+ Kryo k = null;
+ try {
+ k = kryos.take();
+
+ return k.readClassAndObject(new Input(source));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
} finally {
- kryoPool.release(sds);
+ try {
+ kryos.put(k);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
@Override
public void writeClassAndObject(Object o, OutputStream sink) {
- SerDeState sds = null;
+
+ LinkedBlockingQueue<Kryo> kryos = initialize();
+
+ Kryo k = null;
try {
- sds = kryoPool.borrow();
+ k = kryos.take();
- sds.writeClassAndObject(o); // this writes to an internal buffer
+ Output kryoOutput = new Output(sink);
- sds.writeOutputTo(sink); // this copies the internal buffer to sink
+ k.writeClassAndObject(kryoOutput, o);
- sink.flush();
- } catch (IOException e) {
+ kryoOutput.flush();
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
- kryoPool.release(sds);
+ try {
+ kryos.put(k);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
@Override
public int getPriority() {
- return 1024;
+ return 50;
}
- private static int getDefaultKryoPoolSize() {
- String raw = System.getProperty(SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY);
+ @Override
+ public void applyConfiguration(Configuration conf) {
+ initialize(conf);
+ }
- int size = SPARK_KRYO_POOL_SIZE_DEFAULT;
- try {
- size = Integer.valueOf(raw);
- log.info("Setting kryo pool size to {} according to system property {}", size,
- SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY);
- } catch (NumberFormatException e) {
- log.error("System property {}={} could not be parsed as an integer, using default value {}",
- SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY, raw, size, e);
+ private LinkedBlockingQueue<Kryo> initialize() {
+ return initialize(new BaseConfiguration());
+ }
+
+ private LinkedBlockingQueue<Kryo> initialize(Configuration conf) {
+ // DCL is safe in this case due to volatility
+ if (!initialized) {
+ synchronized (UnshadedKryoShimService.class) {
+ if (!initialized) {
+ SparkConf sparkConf = new SparkConf();
+
+ // Copy the user's IoRegistry from the param conf to the SparkConf we just created
+ String regStr = conf.getString(GryoPool.CONFIG_IO_REGISTRY);
+ if (null != regStr) { // SparkConf rejects null values with NPE, so this has to be checked before set(...)
+ sparkConf.set(GryoPool.CONFIG_IO_REGISTRY, regStr);
+ }
+ // Setting spark.serializer here almost certainly isn't necessary, but it doesn't hurt
+ sparkConf.set("spark.serializer", IoRegistryAwareKryoSerializer.class.getCanonicalName());
+
+ String registrator = conf.getString("spark.kryo.registrator");
+ if (null != registrator) {
+ sparkConf.set("spark.kryo.registrator", registrator);
+ log.info("Copied spark.kryo.registrator: {}", registrator);
+ } else {
+ log.info("Not copying spark.kryo.registrator");
+ }
+
+ // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf)
+ int poolSize = conf.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE,
+ GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+ // Instantiate the spark.serializer
+ final IoRegistryAwareKryoSerializer ioReg = new IoRegistryAwareKryoSerializer(sparkConf);
+ // Setup a pool backed by our spark.serializer instance
+
+ for (int i = 0; i < poolSize; i++) {
+ KRYOS.add(ioReg.newKryo());
+ }
+
+ initialized = true;
+ }
+ }
}
- return size;
+ return KRYOS;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
new file mode 100644
index 0000000..68712a6
--- /dev/null
+++ b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
@@ -0,0 +1 @@
+org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService # Supports Spark