You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/09/12 17:15:53 UTC
[2/4] tinkerpop git commit: TINKERPOP-1426,
GryoSerializer should implement Java serialization interface
TINKERPOP-1426, GryoSerializer should implement Java serialization interface
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/28b68684
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/28b68684
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/28b68684
Branch: refs/heads/TINKERPOP-1389
Commit: 28b686841f00201617a1a6fdf38b2ee90e5c8a44
Parents: 1e0d7de
Author: yucx <yu...@cn.ibm.com>
Authored: Fri Sep 2 02:00:41 2016 -0700
Committer: yucx <yu...@cn.ibm.com>
Committed: Fri Sep 2 03:33:12 2016 -0700
----------------------------------------------------------------------
.../spark/structure/io/gryo/GryoSerializer.java | 39 ++++++++++++++++----
1 file changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/28b68684/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 28a4d55..e64cda2 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -48,24 +48,32 @@ import scala.Tuple3;
import scala.collection.mutable.WrappedArray;
import scala.runtime.BoxedUnit;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class GryoSerializer extends Serializer {
+public final class GryoSerializer extends Serializer implements Serializable {
//private final Option<String> userRegistrator;
private final int bufferSize;
private final int maxBufferSize;
+ private final int poolSize;
+ private final ArrayList<String> ioRegList = new ArrayList<>();
+ private final boolean referenceTracking;
+ private final boolean registrationRequired;
- private final GryoPool gryoPool;
+
+ private transient GryoPool gryoPool;
public GryoSerializer(final SparkConf sparkConfiguration) {
final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
- final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
- final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
+ referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+ registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false);
if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
} else {
@@ -77,9 +85,19 @@ public final class GryoSerializer extends Serializer {
//this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
}
}
- this.gryoPool = GryoPool.build().
- poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
- ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
+ poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+ List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList());
+ list.forEach(c -> {
+ ioRegList.add(c.toString());
+ }
+ );
+ }
+
+ private GryoPool createPool(){
+ List<Object> list = new ArrayList<>(ioRegList);
+ return GryoPool.build().
+ poolSize(poolSize).
+ ioRegistries(list).
initializeMapper(builder -> {
try {
builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -118,6 +136,13 @@ public final class GryoSerializer extends Serializer {
}
public GryoPool getGryoPool() {
+ if (gryoPool == null) {
+ synchronized (this) {
+ if (gryoPool == null) {
+ gryoPool = createPool();
+ }
+ }
+ }
return this.gryoPool;
}