You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/25 16:09:14 UTC
tinkerpop git commit: using System.properties() to propagate shim
class to workers. This is identical to the previous META-INF service model,
save there is no META-INF service used.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1389 49924cc66 -> 576dd8ec8
using System.properties() to propagate shim class to workers. This is identical to the previous META-INF service model, save there is no META-INF service used.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/576dd8ec
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/576dd8ec
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/576dd8ec
Branch: refs/heads/TINKERPOP-1389
Commit: 576dd8ec8ce91aef1b20e8f820f2688e3a37eb88
Parents: 49924cc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Oct 25 10:09:07 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Oct 25 10:09:07 2016 -0600
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 3 ---
.../io/gryo/kryoshim/KryoShimServiceLoader.java | 12 ++++++++++--
.../structure/io/RecordReaderWriterTest.java | 2 ++
.../process/computer/SparkGraphComputer.java | 18 ++++++++++++++++--
4 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 6ffd5ea..b06b40a 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -44,7 +44,6 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
@@ -58,7 +57,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.util.Gremlin;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -84,7 +82,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
super(hadoopGraph);
- System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); // HadoopPools only with Giraph
final Configuration configuration = hadoopGraph.configuration();
configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index ac815b1..2edbc78 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +62,15 @@ public class KryoShimServiceLoader {
public static KryoShimService load(final boolean forceReload) {
if (null != cachedShimService && !forceReload)
return cachedShimService;
- if (!configuration.containsKey(KRYO_SHIM_SERVICE))
- throw new IllegalArgumentException("The provided configuration does not contain a " + KRYO_SHIM_SERVICE + " property");
+ if (null == configuration)
+ configuration = new BaseConfiguration();
+ if (!configuration.containsKey(KRYO_SHIM_SERVICE)) {
+ final String systemShimService = System.getProperty(KRYO_SHIM_SERVICE, null);
+ if (null == systemShimService)
+ throw new IllegalStateException("There is no configured shim, nor shim specified in the System properties");
+ log.info("Using the KryoShimService registered with the System properties: " + systemShimService);
+ configuration.setProperty(KRYO_SHIM_SERVICE, systemShimService);
+ }
try {
cachedShimService = ((Class<? extends KryoShimService>) Class.forName(configuration.getString(KRYO_SHIM_SERVICE))).newInstance();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
index f3c079b..ea5686a 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.Test;
import org.slf4j.Logger;
@@ -83,6 +84,7 @@ public abstract class RecordReaderWriterTest {
configuration.set("fs.file.impl", LocalFileSystem.class.getName());
configuration.set("fs.defaultFS", "file:///");
configuration.set("mapreduce.output.fileoutputformat.outputdir", "file:///" + outputDirectory.getAbsolutePath());
+ configuration.set(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName());
return configuration;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 42f2493..80e7785 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -69,6 +69,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -108,12 +109,21 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
super(hadoopGraph);
this.sparkConfiguration = new HadoopConfiguration();
ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
+ ///////////////////////////////////////////////////////
+ // Handle the KryoShimService for data serialization //
+ ///////////////////////////////////////////////////////
if (HadoopPoolShimService.class.getCanonicalName().equals(this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()))) {
this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE,
KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ?
UnshadedKryoShimService.class.getCanonicalName() :
HadoopPoolShimService.class.getCanonicalName());
}
+ final String shimService = this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE);
+ this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
+ (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+ this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+ System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService);
}
@Override
@@ -144,8 +154,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
- //if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
- // graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+ // if no serializer is provided then use the default of KryoSerializer+GryoRegistrator
+ if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) {
+ graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
+ if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR))
+ graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
+ }
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);