You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/25 16:09:14 UTC

tinkerpop git commit: using System.properties() to propagate shim class to workers. This is identical to the previous META-INF service model, save there is no META-INF service used.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1389 49924cc66 -> 576dd8ec8


using System.properties() to propagate shim class to workers. This is identical to the previous META-INF service model, save there is no META-INF service used.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/576dd8ec
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/576dd8ec
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/576dd8ec

Branch: refs/heads/TINKERPOP-1389
Commit: 576dd8ec8ce91aef1b20e8f820f2688e3a37eb88
Parents: 49924cc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Oct 25 10:09:07 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Oct 25 10:09:07 2016 -0600

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java     |  3 ---
 .../io/gryo/kryoshim/KryoShimServiceLoader.java   | 12 ++++++++++--
 .../structure/io/RecordReaderWriterTest.java      |  2 ++
 .../process/computer/SparkGraphComputer.java      | 18 ++++++++++++++++--
 4 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 6ffd5ea..b06b40a 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -44,7 +44,6 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
@@ -58,7 +57,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
@@ -84,7 +82,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
         super(hadoopGraph);
-        System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); // HadoopPools only with Giraph
         final Configuration configuration = hadoopGraph.configuration();
         configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
         this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index ac815b1..2edbc78 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
 
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,8 +62,15 @@ public class KryoShimServiceLoader {
     public static KryoShimService load(final boolean forceReload) {
         if (null != cachedShimService && !forceReload)
             return cachedShimService;
-        if (!configuration.containsKey(KRYO_SHIM_SERVICE))
-            throw new IllegalArgumentException("The provided configuration does not contain a " + KRYO_SHIM_SERVICE + " property");
+        if (null == configuration)
+            configuration = new BaseConfiguration();
+        if (!configuration.containsKey(KRYO_SHIM_SERVICE)) {
+            final String systemShimService = System.getProperty(KRYO_SHIM_SERVICE, null);
+            if (null == systemShimService)
+                throw new IllegalStateException("There is no configured shim, nor shim specified in the System properties");
+            log.info("Using the KryoShimService registered with the System properties: " + systemShimService);
+            configuration.setProperty(KRYO_SHIM_SERVICE, systemShimService);
+        }
 
         try {
             cachedShimService = ((Class<? extends KryoShimService>) Class.forName(configuration.getString(KRYO_SHIM_SERVICE))).newInstance();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
index f3c079b..ea5686a 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -83,6 +84,7 @@ public abstract class RecordReaderWriterTest {
         configuration.set("fs.file.impl", LocalFileSystem.class.getName());
         configuration.set("fs.defaultFS", "file:///");
         configuration.set("mapreduce.output.fileoutputformat.outputdir", "file:///" + outputDirectory.getAbsolutePath());
+        configuration.set(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName());
         return configuration;
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 42f2493..80e7785 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -69,6 +69,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -108,12 +109,21 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         super(hadoopGraph);
         this.sparkConfiguration = new HadoopConfiguration();
         ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
+        ///////////////////////////////////////////////////////
+        // Handle the KryoShimService for data serialization //
+        ///////////////////////////////////////////////////////
         if (HadoopPoolShimService.class.getCanonicalName().equals(this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()))) {
             this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE,
                     KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ?
                             UnshadedKryoShimService.class.getCanonicalName() :
                             HadoopPoolShimService.class.getCanonicalName());
         }
+        final String shimService = this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE);
+        this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
+                (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+        this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+                (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+        System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService);
     }
 
     @Override
@@ -144,8 +154,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the graph computer computation
             final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
-            //if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
-            //    graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+            // if no serializer is provided then use the default of KryoSerializer+GryoRegistrator
+            if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) {
+                graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
+                if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR))
+                    graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
+            }
             graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
             final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
             final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);