You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/19 21:06:42 UTC
[21/50] [abbrv] tinkerpop git commit: got new
GraphComputer.configuration() model working for SparkGraphComputer and
GiraphGraphComputer.
got new GraphComputer.configuration() model working for SparkGraphComputer and GiraphGraphComputer.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/eaf95a5e
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/eaf95a5e
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/eaf95a5e
Branch: refs/heads/TINKERPOP-1564
Commit: eaf95a5eb623c03c4e933e80232fef8413bb0417
Parents: ad0f094
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 19 12:51:41 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 19 13:01:41 2017 -0700
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 37 ++++--------
.../computer/AbstractHadoopGraphComputer.java | 33 ++++++++--
.../process/computer/SparkGraphComputer.java | 63 ++++++++------------
3 files changed, 63 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/eaf95a5e/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 3047ee4..db4d6da 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -85,9 +85,11 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
- super(hadoopGraph);
- final Configuration configuration = hadoopGraph.configuration();
- configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
+ this(hadoopGraph.configuration());
+ }
+
+ private GiraphGraphComputer(final Configuration configuration) {
+ super(configuration);
this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
this.giraphConfiguration.setVertexClass(GiraphVertex.class);
this.giraphConfiguration.setComputationClass(GiraphComputation.class);
@@ -102,31 +104,18 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
}
public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
- return HadoopGraph.open(configuration).compute(GiraphGraphComputer.class);
+ return new GiraphGraphComputer(configuration);
}
@Override
public Future<ComputerResult> submit(final Graph graph) {
- this.hadoopGraph = (HadoopGraph)graph;
- final Configuration configuration = this.hadoopGraph.configuration();
+ final Configuration configuration = graph.configuration();
+ this.configuration.copy(configuration);
configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
return this.submit();
}
@Override
- public GraphComputer workers(final int workers) {
- this.useWorkerThreadsInConfiguration = false;
- return super.workers(workers);
- }
-
- @Override
- public GraphComputer configure(final String key, final Object value) {
- this.giraphConfiguration.set(key, value.toString());
- this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
- return this;
- }
-
- @Override
public GraphComputer program(final VertexProgram vertexProgram) {
super.program(vertexProgram);
this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
@@ -145,14 +134,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter");
}
- @Override
- public org.apache.commons.configuration.Configuration configuration() {
- return ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
- }
-
private Future<ComputerResult> submitWithExecutor(final Executor exec) {
final long startTime = System.currentTimeMillis();
+ this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString()));
+ this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
+ ConfigurationUtils.copy(this.configuration, apacheConfiguration);
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
try {
this.loadJars(giraphConfiguration);
@@ -185,7 +172,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
if (null != this.vertexProgram) {
// a way to verify in Giraph whether the traversal will go over the wire or not
try {
- VertexProgram.createVertexProgram(this.hadoopGraph, ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
+ VertexProgram.createVertexProgram(HadoopGraph.open(this.configuration), ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
} catch (final IllegalStateException e) {
if (e.getCause() instanceof NumberFormatException)
throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/eaf95a5e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 344f04e..b95fb7e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
@@ -63,7 +64,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
Pattern.compile(File.pathSeparator.equals(":") ? "([^:]|://)+" : ("[^" + File.pathSeparator + "]"));
protected final Logger logger;
- protected HadoopGraph hadoopGraph;
+ protected HadoopConfiguration configuration;
protected boolean executed = false;
protected final Set<MapReduce> mapReducers = new HashSet<>();
protected VertexProgram<Object> vertexProgram;
@@ -75,32 +76,40 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
protected GraphFilter graphFilter = new GraphFilter();
public AbstractHadoopGraphComputer(final HadoopGraph hadoopGraph) {
- this.hadoopGraph = hadoopGraph;
+ this(hadoopGraph.configuration());
+ }
+
+ protected AbstractHadoopGraphComputer(final org.apache.commons.configuration.Configuration configuration) {
+ this.configuration = new HadoopConfiguration(configuration);
this.logger = LoggerFactory.getLogger(this.getClass());
- //GraphComputerHelper.configure(this, this.hadoopGraph.configuration());
+ GraphComputerHelper.configure(this, this.configuration);
}
@Override
public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
this.graphFilter.setVertexFilter(vertexFilter);
+ this.configuration.setProperty(VERTICES, vertexFilter);
return this;
}
@Override
public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
this.graphFilter.setEdgeFilter(edgeFilter);
+ this.configuration.setProperty(EDGES, edgeFilter);
return this;
}
@Override
public GraphComputer result(final ResultGraph resultGraph) {
this.resultGraph = resultGraph;
+ this.configuration.setProperty(RESULT, resultGraph.name());
return this;
}
@Override
public GraphComputer persist(final Persist persist) {
this.persist = persist;
+ this.configuration.setProperty(PERSIST, persist.name());
return this;
}
@@ -119,16 +128,28 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
@Override
public GraphComputer workers(final int workers) {
this.workers = workers;
+ this.configuration.setProperty(WORKERS, workers);
return this;
}
@Override
public Future<ComputerResult> submit(final Graph graph) {
- this.hadoopGraph = (HadoopGraph) graph;
+ ConfigurationUtils.copy(graph.configuration(), this.configuration);
return this.submit();
}
@Override
+ public GraphComputer configure(final String key, final Object value) {
+ this.configuration.setProperty(key,value);
+ return this;
+ }
+
+ @Override
+ public org.apache.commons.configuration.Configuration configuration() {
+ return this.configuration;
+ }
+
+ @Override
public String toString() {
return StringFactory.graphComputerString(this);
}
@@ -239,8 +260,8 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
@Override
public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
- if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) {
- final Object writer = ReflectionUtils.newInstance(hadoopGraph.configuration().getGraphWriter(), ConfUtil.makeHadoopConfiguration(hadoopGraph.configuration()));
+ if (configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) {
+ final Object writer = ReflectionUtils.newInstance(configuration.getGraphWriter(), ConfUtil.makeHadoopConfiguration(configuration));
if (writer instanceof PersistResultGraphAware)
return ((PersistResultGraphAware) writer).supportsResultGraphPersistCombination(resultGraph, persist);
else {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/eaf95a5e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index cbcdfe7..8e62c62 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -18,7 +18,6 @@
*/
package org.apache.tinkerpop.gremlin.spark.process.computer;
-import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.FileConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -53,7 +52,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
@@ -73,7 +71,6 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
@@ -94,7 +91,6 @@ import java.util.concurrent.ThreadFactory;
*/
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
- private final org.apache.commons.configuration.Configuration sparkConfiguration;
private boolean workersSet = false;
private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
@@ -117,14 +113,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
public SparkGraphComputer(final HadoopGraph hadoopGraph) {
super(hadoopGraph);
- this.sparkConfiguration = new HadoopConfiguration();
+ }
+
+ private SparkGraphComputer(final org.apache.commons.configuration.Configuration configuration) {
+ super(configuration);
}
@Override
public GraphComputer workers(final int workers) {
super.workers(workers);
- if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
- this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
+ if (this.configuration.containsKey(SparkLauncher.SPARK_MASTER) && this.configuration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
+ this.configuration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
}
this.workersSet = true;
return this;
@@ -132,7 +131,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@Override
public GraphComputer configure(final String key, final Object value) {
- this.sparkConfiguration.setProperty(key, value);
+ this.configuration.setProperty(key, value);
return this;
}
@@ -142,19 +141,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter");
}
- @Override
- public Future<ComputerResult> submit(final Graph graph) {
- ConfigurationUtils.copy(graph.configuration(), this.sparkConfiguration);
- return this.submit();
- }
-
- @Override
- public org.apache.commons.configuration.Configuration configuration() {
- return new HadoopConfiguration(this.sparkConfiguration);
- }
-
public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
- return new SparkGraphComputer(HadoopGraph.open(configuration));
+ return new SparkGraphComputer(configuration);
}
private Future<ComputerResult> submitWithExecutor(Executor exec) {
@@ -164,36 +152,33 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
//////////////////////////////////////////////////
/////// PROCESS SHIM AND SYSTEM PROPERTIES ///////
//////////////////////////////////////////////////
- ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
- final String shimService = KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ?
+ final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.configuration);
+ if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) {
+ graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
+ if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR))
+ graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
+ }
+ final String shimService = KryoSerializer.class.getCanonicalName().equals(graphComputerConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ?
UnshadedKryoShimService.class.getCanonicalName() :
HadoopPoolShimService.class.getCanonicalName();
- this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService);
- ///////////
+ graphComputerConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService);
final StringBuilder params = new StringBuilder();
- this.sparkConfiguration.getKeys().forEachRemaining(key -> {
+ graphComputerConfiguration.getKeys().forEachRemaining(key -> {
if (KEYS_PASSED_IN_JVM_SYSTEM_PROPERTIES.contains(key)) {
- params.append(" -D").append("tinkerpop.").append(key).append("=").append(this.sparkConfiguration.getProperty(key));
- System.setProperty("tinkerpop." + key, this.sparkConfiguration.getProperty(key).toString());
+ params.append(" -D").append("tinkerpop.").append(key).append("=").append(graphComputerConfiguration.getProperty(key));
+ System.setProperty("tinkerpop." + key, graphComputerConfiguration.getProperty(key).toString());
}
});
if (params.length() > 0) {
- this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
- (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
- this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
- (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
+ graphComputerConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
+ (graphComputerConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
+ graphComputerConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ (graphComputerConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim());
}
- KryoShimServiceLoader.applyConfiguration(this.sparkConfiguration);
+ KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
//////////////////////////////////////////////////
//////////////////////////////////////////////////
//////////////////////////////////////////////////
- // apache and hadoop configurations that are used throughout the graph computer computation
- final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
- if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) {
- graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName());
- if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR))
- graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName());
- }
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);