You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/19 19:51:48 UTC
tinkerpop git commit: got new GraphComputer.configuration() model
working for SparkGraphComputer and GiraphGraphComputer.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 70b0b576f -> 9d12dae58
got new GraphComputer.configuration() model working for SparkGraphComputer and GiraphGraphComputer.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/9d12dae5
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/9d12dae5
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/9d12dae5
Branch: refs/heads/TINKERPOP-1564
Commit: 9d12dae58d0c7d0d46e857c6f2477310e7d2758e
Parents: 70b0b57
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 19 12:51:41 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Dec 19 12:51:41 2016 -0700
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 37 +++++++-------------
.../computer/AbstractHadoopGraphComputer.java | 33 +++++++++++++----
.../process/computer/SparkGraphComputer.java | 28 +++++----------
3 files changed, 48 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9d12dae5/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index f4c7958..7d3e236 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -83,9 +83,11 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
- super(hadoopGraph);
- final Configuration configuration = hadoopGraph.configuration();
- configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
+ this(hadoopGraph.configuration());
+ }
+
+ private GiraphGraphComputer(final Configuration configuration) {
+ super(configuration);
this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
this.giraphConfiguration.setVertexClass(GiraphVertex.class);
this.giraphConfiguration.setComputationClass(GiraphComputation.class);
@@ -100,31 +102,18 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
}
public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
- return HadoopGraph.open(configuration).compute(GiraphGraphComputer.class);
+ return new GiraphGraphComputer(configuration);
}
@Override
public Future<ComputerResult> submit(final Graph graph) {
- this.hadoopGraph = (HadoopGraph)graph;
- final Configuration configuration = this.hadoopGraph.configuration();
+ final Configuration configuration = graph.configuration();
+ this.configuration.copy(configuration);
configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
return this.submit();
}
@Override
- public GraphComputer workers(final int workers) {
- this.useWorkerThreadsInConfiguration = false;
- return super.workers(workers);
- }
-
- @Override
- public GraphComputer configure(final String key, final Object value) {
- this.giraphConfiguration.set(key, value.toString());
- this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
- return this;
- }
-
- @Override
public GraphComputer program(final VertexProgram vertexProgram) {
super.program(vertexProgram);
this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
@@ -143,14 +132,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter");
}
- @Override
- public org.apache.commons.configuration.Configuration configuration() {
- return ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
- }
-
private Future<ComputerResult> submitWithExecutor(final Executor exec) {
final long startTime = System.currentTimeMillis();
+ this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString()));
+ this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
+ ConfigurationUtils.copy(this.configuration, apacheConfiguration);
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
try {
this.loadJars(giraphConfiguration);
@@ -183,7 +170,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
if (null != this.vertexProgram) {
// a way to verify in Giraph whether the traversal will go over the wire or not
try {
- VertexProgram.createVertexProgram(this.hadoopGraph, ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
+ VertexProgram.createVertexProgram(HadoopGraph.open(this.configuration), ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
} catch (final IllegalStateException e) {
if (e.getCause() instanceof NumberFormatException)
throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9d12dae5/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 344f04e..b95fb7e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
@@ -63,7 +64,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
Pattern.compile(File.pathSeparator.equals(":") ? "([^:]|://)+" : ("[^" + File.pathSeparator + "]"));
protected final Logger logger;
- protected HadoopGraph hadoopGraph;
+ protected HadoopConfiguration configuration;
protected boolean executed = false;
protected final Set<MapReduce> mapReducers = new HashSet<>();
protected VertexProgram<Object> vertexProgram;
@@ -75,32 +76,40 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
protected GraphFilter graphFilter = new GraphFilter();
public AbstractHadoopGraphComputer(final HadoopGraph hadoopGraph) {
- this.hadoopGraph = hadoopGraph;
+ this(hadoopGraph.configuration());
+ }
+
+ protected AbstractHadoopGraphComputer(final org.apache.commons.configuration.Configuration configuration) {
+ this.configuration = new HadoopConfiguration(configuration);
this.logger = LoggerFactory.getLogger(this.getClass());
- //GraphComputerHelper.configure(this, this.hadoopGraph.configuration());
+ GraphComputerHelper.configure(this, this.configuration);
}
@Override
public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
this.graphFilter.setVertexFilter(vertexFilter);
+ this.configuration.setProperty(VERTICES, vertexFilter);
return this;
}
@Override
public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
this.graphFilter.setEdgeFilter(edgeFilter);
+ this.configuration.setProperty(EDGES, edgeFilter);
return this;
}
@Override
public GraphComputer result(final ResultGraph resultGraph) {
this.resultGraph = resultGraph;
+ this.configuration.setProperty(RESULT, resultGraph.name());
return this;
}
@Override
public GraphComputer persist(final Persist persist) {
this.persist = persist;
+ this.configuration.setProperty(PERSIST, persist.name());
return this;
}
@@ -119,16 +128,28 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
@Override
public GraphComputer workers(final int workers) {
this.workers = workers;
+ this.configuration.setProperty(WORKERS, workers);
return this;
}
@Override
public Future<ComputerResult> submit(final Graph graph) {
- this.hadoopGraph = (HadoopGraph) graph;
+ ConfigurationUtils.copy(graph.configuration(), this.configuration);
return this.submit();
}
@Override
+ public GraphComputer configure(final String key, final Object value) {
+ this.configuration.setProperty(key,value);
+ return this;
+ }
+
+ @Override
+ public org.apache.commons.configuration.Configuration configuration() {
+ return this.configuration;
+ }
+
+ @Override
public String toString() {
return StringFactory.graphComputerString(this);
}
@@ -239,8 +260,8 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
@Override
public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
- if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) {
- final Object writer = ReflectionUtils.newInstance(hadoopGraph.configuration().getGraphWriter(), ConfUtil.makeHadoopConfiguration(hadoopGraph.configuration()));
+ if (configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) {
+ final Object writer = ReflectionUtils.newInstance(configuration.getGraphWriter(), ConfUtil.makeHadoopConfiguration(configuration));
if (writer instanceof PersistResultGraphAware)
return ((PersistResultGraphAware) writer).supportsResultGraphPersistCombination(resultGraph, persist);
else {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9d12dae5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 59497e0..c72454b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -86,7 +86,6 @@ import java.util.concurrent.ThreadFactory;
*/
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
- private final org.apache.commons.configuration.Configuration sparkConfiguration;
private boolean workersSet = false;
private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
@@ -105,15 +104,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
public SparkGraphComputer(final HadoopGraph hadoopGraph) {
super(hadoopGraph);
- this.sparkConfiguration = new HadoopConfiguration();
- ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
+ }
+
+ private SparkGraphComputer(final org.apache.commons.configuration.Configuration configuration) {
+ super(configuration);
}
@Override
public GraphComputer workers(final int workers) {
super.workers(workers);
- if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
- this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
+ if (this.configuration.containsKey(SparkLauncher.SPARK_MASTER) && this.configuration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
+ this.configuration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
}
this.workersSet = true;
return this;
@@ -121,7 +122,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@Override
public GraphComputer configure(final String key, final Object value) {
- this.sparkConfiguration.setProperty(key, value);
+ this.configuration.setProperty(key, value);
return this;
}
@@ -131,19 +132,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter");
}
- @Override
- public Future<ComputerResult> submit(final Graph graph) {
- ConfigurationUtils.copy(graph.configuration(), this.sparkConfiguration);
- return this.submit();
- }
-
- @Override
- public org.apache.commons.configuration.Configuration configuration() {
- return new HadoopConfiguration(this.sparkConfiguration);
- }
-
public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
- return new SparkGraphComputer(HadoopGraph.open(configuration));
+ return new SparkGraphComputer(configuration);
}
private Future<ComputerResult> submitWithExecutor(Executor exec) {
@@ -151,7 +141,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
return computerService.submit(() -> {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
- final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
+ final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.configuration);
if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));