You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pe...@apache.org on 2016/02/18 09:55:20 UTC

falcon git commit: FALCON-887 Support for multiple lib paths in falcon process

Repository: falcon
Updated Branches:
  refs/heads/master 237bab6ed -> 54402764c


FALCON-887 Support for multiple lib paths in falcon process

Author: Sowmya Ramesh <sr...@hortonworks.com>

Reviewers: Peeyush Bishnoi <pe...@apache.org>, Pavan Kumar<pa...@apache.org>

Closes #38 from sowmyaramesh/FALCON-887 and squashes the following commits:

95865df [Sowmya Ramesh] Moved getting lib path to private method
3209a7e [Sowmya Ramesh] Applied feedback
aa8f6a2 [Sowmya Ramesh] FALCON-887 Fixed checkstyle errors
ad4d455 [Sowmya Ramesh] FALCON-887 Support for multiple lib paths in falcon process


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/54402764
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/54402764
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/54402764

Branch: refs/heads/master
Commit: 54402764c124cd12645e24fa2d3cf2283f4c13bd
Parents: 237bab6
Author: Sowmya Ramesh <sr...@hortonworks.com>
Authored: Thu Feb 18 14:25:03 2016 +0530
Committer: peeyush b <pb...@hortonworks.com>
Committed: Thu Feb 18 14:25:03 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/EntityUtil.java    |  1 +
 .../entity/parser/ProcessEntityParser.java      |  9 +++-
 docs/src/site/twiki/EntitySpecification.twiki   |  6 ++-
 .../apache/falcon/oozie/OozieBundleBuilder.java | 26 +++++++++--
 .../ProcessExecutionWorkflowBuilder.java        | 49 +++++++++++---------
 6 files changed, 63 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 297e508..6861519 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@ Trunk
     FALCON-1770 Update README file (Ajay Yadava)
 
   BUG FIXES
+    FALCON-887 Support for multiple lib paths in falcon process (Sowmya Ramesh)
+
     FALCON-1795 Kill api not killing waiting/ready instances
    
     FALCON-1804 Non-SLA feed throws NullPointerException.

http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 24dbf3d..9c03de3 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -89,6 +89,7 @@ public final class EntityUtil {
     public static final String MR_JOB_PRIORITY = "jobPriority";
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+    public static final String WF_LIB_SEPARATOR = ",";
     private static final String STAGING_DIR_NAME_SEPARATOR = "_";
 
     /** Priority with which the DAG will be scheduled.

http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 10a5265..eec6f69 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -173,8 +173,13 @@ public class ProcessEntityParser extends EntityParser<Process> {
                         "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
             }
 
-            if (StringUtils.isNotEmpty(libPath) && !fs.exists(new Path(libPath))) {
-                throw new ValidationException("Lib path: " + libPath + " does not exists in HDFS: " + nameNode);
+            if (StringUtils.isNotBlank(libPath)) {
+                String[] libPaths = libPath.split(EntityUtil.WF_LIB_SEPARATOR);
+                for (String path : libPaths) {
+                    if (!fs.exists(new Path(path))) {
+                        throw new ValidationException("Lib path: " + path + " does not exists in HDFS: " + nameNode);
+                    }
+                }
             }
         } catch (IOException e) {
             throw new FalconException("Error validating workflow path " + workflowPath, e);

http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 6f24d8f..b3d80e2 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -821,6 +821,7 @@ The following are some special properties, which when present are used by the Fa
 ---+++ Workflow
 
 The workflow defines the workflow engine that should be used and the path to the workflow on hdfs.
+Libraries required can be specified using lib attribute in the workflow element and will be comma separated HDFS paths.
 The workflow definition on hdfs contains the actual job that should run and it should confirm to
 the workflow specification of the engine specified. The libraries required by the workflow should
 be in lib folder inside the workflow path.
@@ -840,7 +841,7 @@ Syntax:
 <verbatim>
 <process name="[process name]">
 ...
-    <workflow engine=[workflow engine] path=[workflow path]/>
+    <workflow engine=[workflow engine] path=[workflow path] lib=[comma separated lib paths]/>
 ...
 </process>
 </verbatim>
@@ -856,6 +857,7 @@ Example:
 
 This defines the workflow engine to be oozie and the workflow xml is defined at
 /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.
+Libraries path can be overridden using lib attribute. e.g.: lib="/projects/bootcamp/wf/libs,/projects/bootcamp/oozie/libs" in the workflow element.
 
 ---++++ Pig
 
@@ -865,7 +867,7 @@ Example:
 <verbatim>
 <process name="sample-process">
 ...
-    <workflow engine="pig" path="/projects/bootcamp/pig.script"/>
+    <workflow engine="pig" path="/projects/bootcamp/pig.script" lib="/projects/bootcamp/wf/libs,/projects/bootcamp/pig/libs"/>
 ...
 </process>
 </verbatim>

http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index af3f44d..5f93cc2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.oozie;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -106,14 +107,33 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
 
         //Add libpath
-        String libPath = getLibPath(buildPath);
-        if (libPath != null) {
-            properties.put(OozieClient.LIBPATH, getStoragePath(libPath));
+        String libPath = getOozieLibPath(buildPath);
+        if (StringUtils.isNotBlank(libPath)) {
+            properties.put(OozieClient.LIBPATH, libPath);
         }
 
         return properties;
     }
 
+    private String getOozieLibPath(final Path buildPath) {
+        String path = getLibPath(buildPath);
+        if (StringUtils.isBlank(path)) {
+            return null;
+        }
+        StringBuilder storageLibPaths = new StringBuilder();
+        String[] libPaths = path.split(EntityUtil.WF_LIB_SEPARATOR);
+        for (String libPath : libPaths) {
+            String tempPath = getStoragePath(libPath);
+            if (StringUtils.isNotBlank(tempPath)) {
+                if (StringUtils.isNotBlank(storageLibPaths)) {
+                    storageLibPaths.append(EntityUtil.WF_LIB_SEPARATOR);
+                }
+                storageLibPaths.append(tempPath);
+            }
+        }
+        return StringUtils.isBlank(storageLibPaths) ? null : storageLibPaths.toString();
+    }
+
     protected CONFIGURATION getConfig(Properties props) {
         CONFIGURATION conf = new CONFIGURATION();
         for (Entry<Object, Object> prop : props.entrySet()) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index dc9349f..7d5b331 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -238,36 +238,39 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
 
     protected void addArchiveForCustomJars(Cluster cluster, List<String> archiveList, String lib)
         throws FalconException {
-        if (StringUtils.isEmpty(lib)) {
+        if (StringUtils.isBlank(lib)) {
             return;
         }
 
-        Path libPath = new Path(lib);
-        try {
-            final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {  // File, not a Dir
-                archiveList.add(libPath.toString());
-                return;
-            }
+        String[] libPaths = lib.split(EntityUtil.WF_LIB_SEPARATOR);
+        for (String path : libPaths) {
+            Path libPath = new Path(path);
+            try {
+                final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                        ClusterHelper.getConfiguration(cluster));
+                if (fs.isFile(libPath)) {  // File, not a Dir
+                    archiveList.add(libPath.toString());
+                    return;
+                }
 
-            // lib path is a directory, add each file under the lib dir to archive
-            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
-                @Override
-                public boolean accept(Path path) {
-                    try {
-                        return fs.isFile(path) && path.getName().endsWith(".jar");
-                    } catch (IOException ignore) {
-                        return false;
+                // lib path is a directory, add each file under the lib dir to archive
+                final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
+                    @Override
+                    public boolean accept(Path path) {
+                        try {
+                            return fs.isFile(path) && path.getName().endsWith(".jar");
+                        } catch (IOException ignore) {
+                            return false;
+                        }
                     }
-                }
-            });
+                });
 
-            for (FileStatus fileStatus : fileStatuses) {
-                archiveList.add(fileStatus.getPath().toString());
+                for (FileStatus fileStatus : fileStatuses) {
+                    archiveList.add(fileStatus.getPath().toString());
+                }
+            } catch (IOException e) {
+                throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
             }
-        } catch (IOException e) {
-            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
         }
     }