You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2016/06/13 21:59:09 UTC
[2/6] tinkerpop git commit: Allow DFS paths in `HADOOP_GREMLIN_LIBS`.
Allow DFS paths in `HADOOP_GREMLIN_LIBS`.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5e96f353
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5e96f353
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5e96f353
Branch: refs/heads/master
Commit: 5e96f353b95205b701fc9663aec87183746badf4
Parents: e790e56
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Thu Jun 9 13:46:34 2016 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Mon Jun 13 09:08:09 2016 +0200
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 53 ++++++++------------
.../computer/AbstractHadoopGraphComputer.java | 46 +++++++++++++++++
.../process/computer/SparkGraphComputer.java | 28 ++---------
3 files changed, 72 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 012b9fc..b06b40a 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -61,13 +61,14 @@ import org.apache.tinkerpop.gremlin.util.Gremlin;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.io.File;
+import java.io.IOException;
import java.io.NotSerializableException;
+import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -133,8 +134,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
try {
- final FileSystem fs = FileSystem.get(this.giraphConfiguration);
- this.loadJars(fs);
+ this.loadJars(giraphConfiguration);
ToolRunner.run(this, new String[]{});
} catch (final Exception e) {
//e.printStackTrace();
@@ -247,36 +247,25 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
return this.giraphConfiguration;
}
- private void loadJars(final FileSystem fs) {
- final String hadoopGremlinLibsRemote = "hadoop-gremlin-" + Gremlin.version() + "-libs";
- if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
- final String hadoopGremlinLibsLocal = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
- if (null == hadoopGremlinLibsLocal)
- this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
- else {
- final String[] paths = hadoopGremlinLibsLocal.split(":");
- for (final String path : paths) {
- final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path);
- if (file.exists()) {
- Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> {
- try {
- final Path jarFile = new Path(fs.getHomeDirectory() + "/" + hadoopGremlinLibsRemote + "/" + f.getName());
- if (!fs.exists(jarFile))
- fs.copyFromLocalFile(new Path(f.getPath()), jarFile);
- try {
- DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, fs);
- } catch (final Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- });
- } else {
- this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
- }
- }
+ @Override
+ protected void loadJar(final org.apache.hadoop.conf.Configuration hadoopConfiguration, final File file, final Object... params)
+ throws IOException {
+ final FileSystem defaultFileSystem = FileSystem.get(hadoopConfiguration);
+ try {
+ final Path jarFile = new Path(defaultFileSystem.getHomeDirectory() + "/hadoop-gremlin-" + Gremlin.version() + "-libs/" + file.getName());
+ if (!defaultFileSystem.exists(jarFile)) {
+ final Path sourcePath = new Path(file.getPath());
+ final URI sourceUri = sourcePath.toUri();
+ final FileSystem fs = FileSystem.get(sourceUri, hadoopConfiguration);
+ fs.copyFromLocalFile(sourcePath, jarFile);
}
+ try {
+ DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, defaultFileSystem);
+ } catch (final Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index a05a1be..f5f332d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -41,15 +42,22 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public abstract class AbstractHadoopGraphComputer implements GraphComputer {
+ private final static Pattern PATH_PATTERN = Pattern.compile("([^:]|://)+");
+
protected final Logger logger;
protected final HadoopGraph hadoopGraph;
protected boolean executed = false;
@@ -139,6 +147,44 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers());
}
+ protected void loadJars(final Configuration hadoopConfiguration, final Object... params) {
+ if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+ final String hadoopGremlinLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
+ if (null == hadoopGremlinLibs)
+ this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+ else {
+ try {
+ final Matcher matcher = PATH_PATTERN.matcher(hadoopGremlinLibs);
+ while (matcher.find()) {
+ final String path = matcher.group();
+ FileSystem fs;
+ try {
+ final URI uri = new URI(path);
+ fs = FileSystem.get(uri, hadoopConfiguration);
+ } catch (URISyntaxException e) {
+ fs = FileSystem.get(hadoopConfiguration);
+ }
+ final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path);
+ if (file.exists()) {
+ for (final File f : file.listFiles()) {
+ if (f.getName().endsWith(Constants.DOT_JAR)) {
+ loadJar(hadoopConfiguration, f, params);
+ }
+ }
+ }
+ else
+ this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ protected abstract void loadJar(final Configuration hadoopConfiguration, final File file, final Object... params)
+ throws IOException;
+
@Override
public Features features() {
return new Features();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 5178225..40598c0 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -78,7 +78,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
-import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -209,7 +208,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// execute the vertex program and map reducers and if there is a failure, auto-close the spark context
try {
final JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
- this.loadJars(sparkContext, hadoopConfiguration); // add the project jars to the cluster
+ this.loadJars(hadoopConfiguration, sparkContext); // add the project jars to the cluster
Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents GC
updateLocalConfiguration(sparkContext, sparkConfiguration);
// create a message-passing friendly rdd from the input rdd
@@ -384,27 +383,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
/////////////////
- private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
- if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
- final String hadoopGremlinLocalLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
- if (null == hadoopGremlinLocalLibs)
- this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
- else {
- try {
- final String[] paths = hadoopGremlinLocalLibs.split(":");
- final FileSystem fs = FileSystem.get(hadoopConfiguration);
- for (final String path : paths) {
- final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path);
- if (file.exists())
- Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
- else
- this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
- }
- } catch (IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- }
+ @Override
+ protected void loadJar(final Configuration hadoopConfiguration, final File file, final Object... params) {
+ final JavaSparkContext sparkContext = (JavaSparkContext) params[0];
+ sparkContext.addJar(file.getAbsolutePath());
}
/**