You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2016/06/13 21:59:09 UTC

[2/6] tinkerpop git commit: Allow DFS paths in `HADOOP_GREMLIN_LIBS`.

Allow DFS paths in `HADOOP_GREMLIN_LIBS`.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5e96f353
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5e96f353
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5e96f353

Branch: refs/heads/master
Commit: 5e96f353b95205b701fc9663aec87183746badf4
Parents: e790e56
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Thu Jun 9 13:46:34 2016 +0200
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Mon Jun 13 09:08:09 2016 +0200

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 53 ++++++++------------
 .../computer/AbstractHadoopGraphComputer.java   | 46 +++++++++++++++++
 .../process/computer/SparkGraphComputer.java    | 28 ++---------
 3 files changed, 72 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 012b9fc..b06b40a 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -61,13 +61,14 @@ import org.apache.tinkerpop.gremlin.util.Gremlin;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.NotSerializableException;
+import java.net.URI;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
-import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -133,8 +134,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             try {
-                final FileSystem fs = FileSystem.get(this.giraphConfiguration);
-                this.loadJars(fs);
+                this.loadJars(giraphConfiguration);
                 ToolRunner.run(this, new String[]{});
             } catch (final Exception e) {
                 //e.printStackTrace();
@@ -247,36 +247,25 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
         return this.giraphConfiguration;
     }
 
-    private void loadJars(final FileSystem fs) {
-        final String hadoopGremlinLibsRemote = "hadoop-gremlin-" + Gremlin.version() + "-libs";
-        if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
-            final String hadoopGremlinLibsLocal = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
-            if (null == hadoopGremlinLibsLocal)
-                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
-            else {
-                final String[] paths = hadoopGremlinLibsLocal.split(":");
-                for (final String path : paths) {
-                    final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path);
-                    if (file.exists()) {
-                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> {
-                            try {
-                                final Path jarFile = new Path(fs.getHomeDirectory() + "/" + hadoopGremlinLibsRemote + "/" + f.getName());
-                                if (!fs.exists(jarFile))
-                                    fs.copyFromLocalFile(new Path(f.getPath()), jarFile);
-                                try {
-                                    DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, fs);
-                                } catch (final Exception e) {
-                                    throw new RuntimeException(e.getMessage(), e);
-                                }
-                            } catch (final Exception e) {
-                                throw new IllegalStateException(e.getMessage(), e);
-                            }
-                        });
-                    } else {
-                        this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
-                    }
-                }
+    @Override
+    protected void loadJar(final org.apache.hadoop.conf.Configuration hadoopConfiguration, final File file, final Object... params)
+            throws IOException {
+        final FileSystem defaultFileSystem = FileSystem.get(hadoopConfiguration);
+        try {
+            final Path jarFile = new Path(defaultFileSystem.getHomeDirectory() + "/hadoop-gremlin-" + Gremlin.version() + "-libs/" + file.getName());
+            if (!defaultFileSystem.exists(jarFile)) {
+                final Path sourcePath = new Path(file.getPath());
+                final URI sourceUri = sourcePath.toUri();
+                final FileSystem fs = FileSystem.get(sourceUri, hadoopConfiguration);
+                fs.copyFromLocalFile(sourcePath, jarFile);
             }
+            try {
+                DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, defaultFileSystem);
+            } catch (final Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index a05a1be..f5f332d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -41,15 +42,22 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public abstract class AbstractHadoopGraphComputer implements GraphComputer {
 
+    private final static Pattern PATH_PATTERN = Pattern.compile("([^:]|://)+");
+
     protected final Logger logger;
     protected final HadoopGraph hadoopGraph;
     protected boolean executed = false;
@@ -139,6 +147,44 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
             throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers());
     }
 
+    protected void loadJars(final Configuration hadoopConfiguration, final Object... params) {
+        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+            final String hadoopGremlinLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
+            if (null == hadoopGremlinLibs)
+                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+            else {
+                try {
+                    final Matcher matcher = PATH_PATTERN.matcher(hadoopGremlinLibs);
+                    while (matcher.find()) {
+                        final String path = matcher.group();
+                        FileSystem fs;
+                        try {
+                            final URI uri = new URI(path);
+                            fs = FileSystem.get(uri, hadoopConfiguration);
+                        } catch (URISyntaxException e) {
+                            fs = FileSystem.get(hadoopConfiguration);
+                        }
+                        final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path);
+                        if (file.exists()) {
+                            for (final File f : file.listFiles()) {
+                                if (f.getName().endsWith(Constants.DOT_JAR)) {
+                                    loadJar(hadoopConfiguration, f, params);
+                                }
+                            }
+                        }
+                        else
+                            this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
+                    }
+                } catch (IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    protected abstract void loadJar(final Configuration hadoopConfiguration, final File file, final Object... params)
+            throws IOException;
+
     @Override
     public Features features() {
         return new Features();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 5178225..40598c0 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -78,7 +78,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
-import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -209,7 +208,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
             try {
                 final JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-                this.loadJars(sparkContext, hadoopConfiguration); // add the project jars to the cluster
+                this.loadJars(hadoopConfiguration, sparkContext); // add the project jars to the cluster
                 Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents GC
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // create a message-passing friendly rdd from the input rdd
@@ -384,27 +383,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
     /////////////////
 
-    private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
-        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
-            final String hadoopGremlinLocalLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS);
-            if (null == hadoopGremlinLocalLibs)
-                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
-            else {
-                try {
-                    final String[] paths = hadoopGremlinLocalLibs.split(":");
-                    final FileSystem fs = FileSystem.get(hadoopConfiguration);
-                    for (final String path : paths) {
-                        final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path);
-                        if (file.exists())
-                            Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
-                        else
-                            this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
-                    }
-                } catch (IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-            }
-        }
+    @Override
+    protected void loadJar(final Configuration hadoopConfiguration, final File file, final Object... params) {
+        final JavaSparkContext sparkContext = (JavaSparkContext) params[0];
+        sparkContext.addJar(file.getAbsolutePath());
     }
 
     /**