You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/30 00:07:10 UTC
[06/21] incubator-tinkerpop git commit: GryoPool now completely uses
a GryoPool.Builder. Building a GryoPool is alot more elegant and consistent
now. Updated GryoPoolTest accordingly.
GryoPool now completely uses a GryoPool.Builder. Building a GryoPool is alot more elegant and consistent now. Updated GryoPoolTest accordingly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/802cf6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/802cf6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/802cf6a2
Branch: refs/heads/thread-issue-tinkergraph
Commit: 802cf6a2dac728d2b561c88a7c5f591c7b82591e
Parents: b32263e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Sep 28 16:22:41 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Sep 28 16:22:41 2015 -0600
----------------------------------------------------------------------
.../gremlin/structure/io/gryo/GryoPool.java | 151 ++++++++++---------
.../gremlin/structure/io/gryo/GryoPoolTest.java | 18 +--
.../hadoop/structure/io/HadoopPools.java | 14 +-
.../spark/structure/io/gryo/GryoSerializer.java | 4 +-
4 files changed, 100 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index 97884bb..0b6faab 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -18,7 +18,6 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo;
-import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
import org.apache.tinkerpop.shaded.kryo.Kryo;
@@ -46,7 +45,7 @@ public final class GryoPool {
private Queue<GryoReader> gryoReaders;
private Queue<GryoWriter> gryoWriters;
- private final GryoMapper mapper;
+ private GryoMapper mapper;
public static GryoPool.Builder build() {
return new GryoPool.Builder();
@@ -56,48 +55,10 @@ public final class GryoPool {
* Used by {@code GryoPool.Builder}.
*/
private GryoPool() {
- this.mapper = null;
- }
-
- /**
- * Create a pool of readers and writers from a {@code Configuration} object. There are two configuration keys
- * expected: "gremlin.io.registry" which defines comma separated list of the fully qualified class names of
- * {@link IoRegistry} implementations to use and the "gremlin.io.gryo.poolSize" which defines the initial size
- * of the {@code GryoPool}. As with usage of {@link GryoMapper.Builder#addRegistry(IoRegistry)}, the order in
- * which these items are added matters greatly. The order used for writing should be the order used for reading.
- */
- public GryoPool(final Configuration conf) {
- this(conf.getInt(CONFIG_IO_GRYO_POOL_SIZE, 256), Type.READER_WRITER,
- tryCreateIoRegistry(conf.getList(CONFIG_IO_REGISTRY, Collections.emptyList())));
- }
-
- /**
- * Create a pool of readers and writers of specified size and use the default {@link GryoMapper} (which means
- * that custom serializers from vendors will not be applied.
- *
- * @param poolSize initial size of the pool.
- */
- public GryoPool(final int poolSize) {
- this(poolSize, Type.READER_WRITER, Collections.emptyList());
- }
-
- /**
- * Create a pool of a readers, writers or both of the specified size with an optional {@link IoRegistry} object
- * which would allow custom serializers to be registered to the pool.
- *
- * @param poolSize initial size of the pool.
- * @param type the type of pool.
- * @param registries a list of registries to assign to each {@link GryoReader} and {@link GryoWriter} instances.
- */
- public GryoPool(final int poolSize, final Type type, final List<IoRegistry> registries) {
- final GryoMapper.Builder mapperBuilder = GryoMapper.build();
- registries.forEach(mapperBuilder::addRegistry);
- // should be able to re-use the GryoMapper - it creates fresh kryo instances from its createMapper method
- this.mapper = mapperBuilder.create();
- createPool(poolSize, type, mapper);
}
private void createPool(final int poolSize, final Type type, final GryoMapper gryoMapper) {
+ this.mapper = gryoMapper;
if (type.equals(Type.READER) || type.equals(Type.READER_WRITER)) {
this.gryoReaders = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
@@ -143,71 +104,92 @@ public final class GryoPool {
this.offerWriter(gryoWriter);
}
- private static List<IoRegistry> tryCreateIoRegistry(final List<Object> classNames) {
- if (classNames.isEmpty()) return Collections.emptyList();
-
- final List<IoRegistry> registries = new ArrayList<>();
- classNames.forEach(c -> {
- try {
- final String className = c.toString();
- final Class<?> clazz = Class.forName(className);
- try {
- final Method instanceMethod = clazz.getDeclaredMethod("getInstance");
- if (IoRegistry.class.isAssignableFrom(instanceMethod.getReturnType()))
- registries.add((IoRegistry) instanceMethod.invoke(null));
- else
- throw new Exception();
- } catch (Exception methodex) {
- // tried getInstance() and that failed so try newInstance() no-arg constructor
- registries.add((IoRegistry) clazz.newInstance());
- }
- } catch (Exception ex) {
- throw new IllegalStateException(ex);
- }
- });
- return registries;
- }
-
////
public static class Builder {
private int poolSize = 256;
+ private List<IoRegistry> ioRegistries = new ArrayList<>();
private Type type = Type.READER_WRITER;
private Consumer<GryoMapper.Builder> gryoMapperConsumer = null;
private Consumer<Kryo> kryoConsumer = null;
- private Configuration configuration = null;
- public Builder configuration(final Configuration configuration) {
- this.configuration = configuration;
+ /**
+ * The {@code IoRegistry} class names to use for the {@code GryoPool}
+ *
+ * @param ioRegistryClassNames a list of class names
+ * @return the update builder
+ */
+ public Builder ioRegistries(final List<Object> ioRegistryClassNames) {
+ this.ioRegistries.addAll(tryCreateIoRegistry(ioRegistryClassNames));
+ return this;
+ }
+
+ /**
+ * The {@code IoRegistry} class name to use for the {@code GryoPool}
+ *
+ * @param ioRegistryClassName a class name
+ * @return the update builder
+ */
+ public Builder ioRegistry(final Object ioRegistryClassName) {
+ this.ioRegistries.addAll(tryCreateIoRegistry(Collections.singletonList(ioRegistryClassName)));
return this;
}
+ /**
+ * The size of the {@code GryoPool}. The size can not be changed once created.
+ *
+ * @param poolSize the pool size
+ * @return the updated builder
+ */
public Builder poolSize(int poolSize) {
this.poolSize = poolSize;
return this;
}
+ /**
+ * The type of {@code GryoPool} to support -- see {@code Type}
+ *
+ * @param type the pool type
+ * @return the updated builder
+ */
public Builder type(final Type type) {
this.type = type;
return this;
}
+ /**
+ * A consumer to update the {@code GryoMapper.Builder} once constructed.
+ *
+ * @param gryoMapperConsumer the {@code GryoMapper.Builder} consumer
+ * @return the updated builder
+ */
public Builder initializeMapper(final Consumer<GryoMapper.Builder> gryoMapperConsumer) {
this.gryoMapperConsumer = gryoMapperConsumer;
return this;
}
+ /**
+ * A consumer to update all the {@link Kryo} instances for all {@code GryoReader} and {@code GryoWriter} instances.
+ *
+ * @param kryoConsumer the consumer
+ * @return the updated builder
+ */
public Builder initializeKryo(final Consumer<Kryo> kryoConsumer) {
this.kryoConsumer = kryoConsumer;
return this;
}
+ /**
+ * Create the {@code GryoPool} from this builder.
+ *
+ * @return the new pool
+ */
public GryoPool create() {
final GryoMapper.Builder mapper = GryoMapper.build();
final GryoPool gryoPool = new GryoPool();
- if (null != this.configuration)
- tryCreateIoRegistry(this.configuration.getList(CONFIG_IO_REGISTRY, Collections.emptyList())).forEach(mapper::addRegistry);
+ if (null != this.ioRegistries)
+ this.ioRegistries.forEach(mapper::addRegistry);
if (null != this.gryoMapperConsumer)
this.gryoMapperConsumer.accept(mapper);
gryoPool.createPool(this.poolSize, this.type, mapper.create());
@@ -221,5 +203,32 @@ public final class GryoPool {
}
return gryoPool;
}
+
+ /////
+
+ private static List<IoRegistry> tryCreateIoRegistry(final List<Object> classNames) {
+ if (classNames.isEmpty()) return Collections.emptyList();
+
+ final List<IoRegistry> registries = new ArrayList<>();
+ classNames.forEach(c -> {
+ try {
+ final String className = c.toString();
+ final Class<?> clazz = Class.forName(className);
+ try {
+ final Method instanceMethod = clazz.getDeclaredMethod("getInstance");
+ if (IoRegistry.class.isAssignableFrom(instanceMethod.getReturnType()))
+ registries.add((IoRegistry) instanceMethod.invoke(null));
+ else
+ throw new Exception();
+ } catch (Exception methodex) {
+ // tried getInstance() and that failed so try newInstance() no-arg constructor
+ registries.add((IoRegistry) clazz.newInstance());
+ }
+ } catch (Exception ex) {
+ throw new IllegalStateException(ex);
+ }
+ });
+ return registries;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
index ea4bac7..a3ac294 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.IoX;
import org.apache.tinkerpop.gremlin.structure.io.IoXIoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.IoY;
@@ -30,6 +29,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.util.Collections;
import static org.junit.Assert.assertEquals;
@@ -41,7 +41,7 @@ public class GryoPoolTest {
@Test
public void shouldDoWithReaderWriterMethods() throws Exception {
final Configuration conf = new BaseConfiguration();
- final GryoPool pool = new GryoPool(conf);
+ final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) {
pool.doWithWriter(writer -> writer.writeObject(os, 1));
os.flush();
@@ -55,7 +55,7 @@ public class GryoPoolTest {
@Test
public void shouldConfigPoolOnConstructionWithDefaults() throws Exception {
final Configuration conf = new BaseConfiguration();
- final GryoPool pool = new GryoPool(conf);
+ final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
assertReaderWriter(pool.takeWriter(), pool.takeReader(), 1, Integer.class);
}
@@ -63,7 +63,7 @@ public class GryoPoolTest {
public void shouldConfigPoolOnConstructionWithPoolSizeOneAndNoIoRegistry() throws Exception {
final Configuration conf = new BaseConfiguration();
conf.setProperty(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 1);
- final GryoPool pool = new GryoPool(conf);
+ final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
final GryoReader reader = pool.takeReader();
final GryoWriter writer = pool.takeWriter();
@@ -88,7 +88,7 @@ public class GryoPoolTest {
public void shouldConfigPoolOnConstructionWithCustomIoRegistryConstructor() throws Exception {
final Configuration conf = new BaseConfiguration();
conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, IoXIoRegistry.ConstructorBased.class.getName());
- final GryoPool pool = new GryoPool(conf);
+ final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
}
@@ -96,7 +96,7 @@ public class GryoPoolTest {
public void shouldConfigPoolOnConstructionWithCustomIoRegistryInstance() throws Exception {
final Configuration conf = new BaseConfiguration();
conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, IoXIoRegistry.InstanceBased.class.getName());
- final GryoPool pool = new GryoPool(conf);
+ final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
}
@@ -105,7 +105,7 @@ public class GryoPoolTest {
final Configuration conf = new BaseConfiguration();
conf.setProperty(GryoPool.CONFIG_IO_REGISTRY,
IoXIoRegistry.InstanceBased.class.getName() + "," + IoYIoRegistry.InstanceBased.class.getName());
- final GryoPool pool = new GryoPool(conf);
+ final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoY(100, 200), IoY.class);
}
@@ -113,7 +113,7 @@ public class GryoPoolTest {
@Test(expected = IllegalArgumentException.class)
public void shouldConfigPoolOnConstructionWithoutCustomIoRegistryAndFail() throws Exception {
final Configuration conf = new BaseConfiguration();
- final GryoPool pool = new GryoPool(conf);
+ final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class);
}
@@ -121,7 +121,7 @@ public class GryoPoolTest {
public void shouldConfigPoolOnConstructionWithoutBadIoRegistryAndFail() throws Exception {
final Configuration conf = new BaseConfiguration();
conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, "some.class.that.does.not.exist");
- new GryoPool(conf);
+ GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create();
}
private static <T> void assertReaderWriter(final GryoWriter writer, final GryoReader reader, final T o,
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index b714e2e..9ece0ae 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -23,6 +23,8 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import java.util.Collections;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -31,21 +33,21 @@ public final class HadoopPools {
private HadoopPools() {
}
- private static GryoPool GRYO_POOL = new GryoPool(256);
+ private static GryoPool GRYO_POOL = GryoPool.build().create();
private static boolean INITIALIZED = false;
public synchronized static void initialize(final Configuration configuration) {
if (!INITIALIZED) {
INITIALIZED = true;
- GRYO_POOL = new GryoPool(configuration);
+ GRYO_POOL = GryoPool.build().
+ poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
+ ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+ create();
}
}
public synchronized static void initialize(final org.apache.hadoop.conf.Configuration configuration) {
- if (!INITIALIZED) {
- INITIALIZED = true;
- GRYO_POOL = new GryoPool(ConfUtil.makeApacheConfiguration(configuration));
- }
+ HadoopPools.initialize(ConfUtil.makeApacheConfiguration(configuration));
}
public static GryoPool getGryoPool() {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/802cf6a2/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 7b12807..7c409ef 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -43,6 +43,8 @@ import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
+import java.util.Collections;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -72,7 +74,7 @@ public final class GryoSerializer extends Serializer {
}
this.gryoPool = GryoPool.build().
poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
- configuration(makeApacheConfiguration(sparkConfiguration)).
+ ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
initializeMapper(builder -> {
try {
builder.