You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2014/08/08 09:25:14 UTC

git commit: FALCON-571 user libs not getting loaded during process execution. Contributed by Shwetha GS.

Repository: incubator-falcon
Updated Branches:
  refs/heads/master c8820bd5e -> a13323ef6


FALCON-571 user libs not getting loaded during process execution. Contributed by Shwetha GS.


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a13323ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a13323ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a13323ef

Branch: refs/heads/master
Commit: a13323ef60b7fd089b3f832b02b72c5add4021d2
Parents: c8820bd
Author: Suhas V <su...@inmobi.com>
Authored: Fri Aug 8 12:49:57 2014 +0530
Committer: Suhas V <su...@inmobi.com>
Committed: Fri Aug 8 12:49:57 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../org/apache/falcon/entity/EntityUtil.java    |  2 +-
 .../org/apache/falcon/entity/ProcessHelper.java |  4 +-
 .../apache/falcon/oozie/OozieBundleBuilder.java |  2 -
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  4 ++
 .../apache/falcon/oozie/OozieEntityBuilder.java | 15 +++++++
 .../OozieOrchestrationWorkflowBuilder.java      | 44 ++++++++++----------
 .../falcon/oozie/feed/FeedBundleBuilder.java    |  4 --
 .../process/HiveProcessWorkflowBuilder.java     |  3 +-
 .../process/PigProcessWorkflowBuilder.java      |  3 +-
 .../oozie/process/ProcessBundleBuilder.java     |  5 ---
 .../OozieProcessWorkflowBuilderTest.java        |  8 ++++
 .../resources/config/process/process-0.1.xml    |  2 +-
 13 files changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 062e318..d090789 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,6 +28,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-571 user libs not getting loaded during process execution 
+   (Shwetha GS via Suhas Vasu)
+
    FALCON-514 Falcon CLI giving error when using -file option with -rerun in 
    instance management. (pavan kumar kolamuri via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 5909113..73e19f5 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -594,7 +594,7 @@ public final class EntityUtil {
             return filesArray;
 
         } catch (FileNotFoundException e) {
-            LOG.info("Staging path doesn't exist, entity is not scheduled", e);
+            LOG.info("Staging path " + basePath + " doesn't exist, entity is not scheduled");
             //Staging path doesn't exist if entity is not scheduled
         } catch (IOException e) {
             throw new FalconException(e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 44dac3c..59361e8 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -106,9 +106,9 @@ public final class ProcessHelper {
 
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {
-                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
+                return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
             } else {
-                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USERLIB_DIR);
+                return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR);
             }
         } catch(IOException e) {
             throw new FalconException("Failed to get user lib path", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 6185aaf..82f7251 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -99,8 +99,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         return properties;
     }
 
-    protected abstract Path getLibPath(Cluster cluster, Path buildPath) throws FalconException;
-
     protected CONFIGURATION getConfig(Properties props) {
         CONFIGURATION conf = new CONFIGURATION();
         for (Entry<Object, Object> prop : props.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index e354011..44f4410 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -162,4 +162,8 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
     }
 
+    @Override
+    protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+        return super.getLibPath(cluster, buildPath.getParent());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 7557e3d..1c3085c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -24,6 +24,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
@@ -296,4 +297,18 @@ public abstract class OozieEntityBuilder<T extends Entity> {
             IOUtils.closeQuietly(resourceAsStream);
         }
     }
+
+    protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+        switch (entity.getEntityType()) {
+        case PROCESS:
+            return ProcessHelper.getUserLibPath((Process) entity, cluster, buildPath);
+
+        case FEED:
+            return new Path(buildPath, "lib");
+
+        default:
+        }
+        throw new IllegalArgumentException("Unhandled type " + entity.getEntityType());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 083f807..fa645a5 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -76,9 +76,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml";
     private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml";
 
-    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
-        Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME,
-            FAIL_POSTPROCESS_ACTION_NAME, }));
+    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(
+        new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
 
     private final Tag lifecycle;
 
@@ -89,7 +88,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
     public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
         throws FalconException {
-        switch(entity.getEntityType()) {
+        switch (entity.getEntityType()) {
         case FEED:
             Feed feed = (Feed) entity;
             switch (lifecycle) {
@@ -105,13 +104,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
                 }
 
             default:
-                throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle "
-                    + lifecycle);
+                throw new IllegalArgumentException("Unhandled type " + entity.getEntityType()
+                    + ", lifecycle " + lifecycle);
             }
 
         case PROCESS:
             Process process = (Process) entity;
-            switch(process.getWorkflow().getEngine()) {
+            switch (process.getWorkflow().getEngine()) {
             case PIG:
                 return new PigProcessWorkflowBuilder(process);
 
@@ -195,26 +194,24 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     }
 
     protected boolean shouldPreProcess() throws FalconException {
-        if (EntityUtil.getLateProcess(entity) == null
-            || EntityUtil.getLateProcess(entity).getLateInputs() == null
+        if (EntityUtil.getLateProcess(entity) == null || EntityUtil.getLateProcess(entity).getLateInputs() == null
             || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
             return false;
         }
         return true;
     }
 
-    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag)
-        throws FalconException {
+    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
         FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
         try {
             addExtensionJars(fs, new Path(libext), wf);
             addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
             if (tag != null) {
-                addExtensionJars(fs,
-                    new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()), wf);
+                addExtensionJars(fs, new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()),
+                    wf);
             }
-        } catch(IOException e) {
+        } catch (IOException e) {
             throw new FalconException(e);
         }
     }
@@ -223,7 +220,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         FileStatus[] libs = null;
         try {
             libs = fs.listStatus(path);
-        } catch(FileNotFoundException ignore) {
+        } catch (FileNotFoundException ignore) {
             //Ok if the libext is not configured
         }
 
@@ -231,12 +228,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
             return;
         }
 
-        for(FileStatus lib : libs) {
+        for (FileStatus lib : libs) {
             if (lib.isDir()) {
                 continue;
             }
 
-            for(Object obj: wf.getDecisionOrForkOrJoin()) {
+            for (Object obj : wf.getDecisionOrForkOrJoin()) {
                 if (!(obj instanceof ACTION)) {
                     continue;
                 }
@@ -273,8 +270,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         }
     }
 
-    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
-        String prefix) throws IOException {
+    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf, String prefix)
+        throws IOException {
         OutputStream out = null;
         try {
             out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
@@ -308,8 +305,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
      * @param workflowApp workflow xml
      * @param cluster     cluster entity
      */
-    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
-        String credentialName, Set<String> actions) {
+    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName,
+        Set<String> actions) {
         addHCatalogCredentials(workflowApp, cluster, credentialName);
 
         // add credential to each action
@@ -359,4 +356,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
         action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
     }
+
+    @Override
+    protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+        return super.getLibPath(cluster, buildPath.getParent());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index 08a4e5c..3347fbf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -38,10 +38,6 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
         super(entity);
     }
 
-    @Override protected Path getLibPath(Cluster cluster, Path buildPath) {
-        return new Path(buildPath, "lib");
-    }
-
     @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
         List<Properties> props = new ArrayList<Properties>();
         List<Properties> evictionProps =

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
index 358475d..1db4ca4 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
@@ -61,8 +61,7 @@ public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         // adds hive-site.xml in hive classpath
         hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml");
 
-        addArchiveForCustomJars(cluster, hiveAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
-            buildPath));
+        addArchiveForCustomJars(cluster, hiveAction.getArchive(), getLibPath(cluster, buildPath));
 
         OozieUtils.marshalHiveAction(action, actionJaxbElement);
         return action;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
index 6a83ddf..6bd5dd8 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
@@ -60,8 +60,7 @@ public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
             pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
         }
 
-        addArchiveForCustomJars(cluster, pigAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
-            buildPath));
+        addArchiveForCustomJars(cluster, pigAction.getArchive(), getLibPath(cluster, buildPath));
 
         return action;
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index d0b75fc..ab38259 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -23,7 +23,6 @@ import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -106,10 +105,6 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
         }
     }
 
-    @Override protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
-        return ProcessHelper.getUserLibPath(entity, cluster, buildPath);
-    }
-
     @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
         copyUserWorkflow(cluster, buildPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 4b453c7..14759e0 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -44,6 +44,7 @@ import org.apache.falcon.messaging.EntityInstanceMessage;
 import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.CONFIGURATION;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
@@ -629,6 +630,13 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
             bundle.getCoordinator().get(0).getName());
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+        List<CONFIGURATION.Property> props = bundle.getCoordinator().get(0).getConfiguration().getProperty();
+        for (CONFIGURATION.Property prop : props) {
+            if(prop.getName().equals("oozie.libpath")) {
+                Assert.assertEquals(prop.getValue().replace("${nameNode}", ""), new Path(bundlePath,
+                    "userlib").toString());
+            }
+        }
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
         testDefCoordMap(process, coord);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/process-0.1.xml b/oozie/src/test/resources/config/process/process-0.1.xml
index 6148441..6383a04 100644
--- a/oozie/src/test/resources/config/process/process-0.1.xml
+++ b/oozie/src/test/resources/config/process/process-0.1.xml
@@ -34,7 +34,7 @@
         <property name="mapred.job.priority" value="LOW"/>
     </properties>
 
-    <workflow name="test" version="1.0.0" engine="oozie" path="/user/guest/workflow"/>
+    <workflow name="test" version="1.0.0" engine="oozie" path="/user/guest/workflow" lib="/user/guest/workflowlib"/>
 
     <retry policy="periodic" delay="hours(10)" attempts="3"/>