You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2014/08/08 09:25:14 UTC
git commit: FALCON-571 user libs not getting loaded during process
execution. Contributed by Shwetha GS.
Repository: incubator-falcon
Updated Branches:
refs/heads/master c8820bd5e -> a13323ef6
FALCON-571 user libs not getting loaded during process execution. Contributed by Shwetha GS.
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a13323ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a13323ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a13323ef
Branch: refs/heads/master
Commit: a13323ef60b7fd089b3f832b02b72c5add4021d2
Parents: c8820bd
Author: Suhas V <su...@inmobi.com>
Authored: Fri Aug 8 12:49:57 2014 +0530
Committer: Suhas V <su...@inmobi.com>
Committed: Fri Aug 8 12:49:57 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../org/apache/falcon/entity/EntityUtil.java | 2 +-
.../org/apache/falcon/entity/ProcessHelper.java | 4 +-
.../apache/falcon/oozie/OozieBundleBuilder.java | 2 -
.../falcon/oozie/OozieCoordinatorBuilder.java | 4 ++
.../apache/falcon/oozie/OozieEntityBuilder.java | 15 +++++++
.../OozieOrchestrationWorkflowBuilder.java | 44 ++++++++++----------
.../falcon/oozie/feed/FeedBundleBuilder.java | 4 --
.../process/HiveProcessWorkflowBuilder.java | 3 +-
.../process/PigProcessWorkflowBuilder.java | 3 +-
.../oozie/process/ProcessBundleBuilder.java | 5 ---
.../OozieProcessWorkflowBuilderTest.java | 8 ++++
.../resources/config/process/process-0.1.xml | 2 +-
13 files changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 062e318..d090789 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -28,6 +28,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-571 user libs not getting loaded during process execution
+ (Shwetha GS via Suhas Vasu)
+
FALCON-514 Falcon CLI giving error when using -file option with -rerun in
instance management. (pavan kumar kolamuri via Shwetha GS)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 5909113..73e19f5 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -594,7 +594,7 @@ public final class EntityUtil {
return filesArray;
} catch (FileNotFoundException e) {
- LOG.info("Staging path doesn't exist, entity is not scheduled", e);
+ LOG.info("Staging path " + basePath + " doesn't exist, entity is not scheduled");
//Staging path doesn't exist if entity is not scheduled
} catch (IOException e) {
throw new FalconException(e);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 44dac3c..59361e8 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -106,9 +106,9 @@ public final class ProcessHelper {
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
if (fs.isFile(libPath)) {
- return new Path(buildPath.getParent(), EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
+ return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
} else {
- return new Path(buildPath.getParent(), EntityUtil.PROCESS_USERLIB_DIR);
+ return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR);
}
} catch(IOException e) {
throw new FalconException("Failed to get user lib path", e);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 6185aaf..82f7251 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -99,8 +99,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
return properties;
}
- protected abstract Path getLibPath(Cluster cluster, Path buildPath) throws FalconException;
-
protected CONFIGURATION getConfig(Properties props) {
CONFIGURATION conf = new CONFIGURATION();
for (Entry<Object, Object> prop : props.entrySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index e354011..44f4410 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -162,4 +162,8 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
}
+ @Override
+ protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+ return super.getLibPath(cluster, buildPath.getParent());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 7557e3d..1c3085c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -24,6 +24,7 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Property;
@@ -296,4 +297,18 @@ public abstract class OozieEntityBuilder<T extends Entity> {
IOUtils.closeQuietly(resourceAsStream);
}
}
+
+ protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+ switch (entity.getEntityType()) {
+ case PROCESS:
+ return ProcessHelper.getUserLibPath((Process) entity, cluster, buildPath);
+
+ case FEED:
+ return new Path(buildPath, "lib");
+
+ default:
+ }
+ throw new IllegalArgumentException("Unhandled type " + entity.getEntityType());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 083f807..fa645a5 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -76,9 +76,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml";
private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml";
- public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
- Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME,
- FAIL_POSTPROCESS_ACTION_NAME, }));
+ public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(
+ new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
private final Tag lifecycle;
@@ -89,7 +88,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
throws FalconException {
- switch(entity.getEntityType()) {
+ switch (entity.getEntityType()) {
case FEED:
Feed feed = (Feed) entity;
switch (lifecycle) {
@@ -105,13 +104,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
}
default:
- throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle "
- + lifecycle);
+ throw new IllegalArgumentException("Unhandled type " + entity.getEntityType()
+ + ", lifecycle " + lifecycle);
}
case PROCESS:
Process process = (Process) entity;
- switch(process.getWorkflow().getEngine()) {
+ switch (process.getWorkflow().getEngine()) {
case PIG:
return new PigProcessWorkflowBuilder(process);
@@ -195,26 +194,24 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
}
protected boolean shouldPreProcess() throws FalconException {
- if (EntityUtil.getLateProcess(entity) == null
- || EntityUtil.getLateProcess(entity).getLateInputs() == null
+ if (EntityUtil.getLateProcess(entity) == null || EntityUtil.getLateProcess(entity).getLateInputs() == null
|| EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
return false;
}
return true;
}
- protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag)
- throws FalconException {
+ protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
try {
addExtensionJars(fs, new Path(libext), wf);
addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
if (tag != null) {
- addExtensionJars(fs,
- new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()), wf);
+ addExtensionJars(fs, new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()),
+ wf);
}
- } catch(IOException e) {
+ } catch (IOException e) {
throw new FalconException(e);
}
}
@@ -223,7 +220,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
FileStatus[] libs = null;
try {
libs = fs.listStatus(path);
- } catch(FileNotFoundException ignore) {
+ } catch (FileNotFoundException ignore) {
//Ok if the libext is not configured
}
@@ -231,12 +228,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
return;
}
- for(FileStatus lib : libs) {
+ for (FileStatus lib : libs) {
if (lib.isDir()) {
continue;
}
- for(Object obj: wf.getDecisionOrForkOrJoin()) {
+ for (Object obj : wf.getDecisionOrForkOrJoin()) {
if (!(obj instanceof ACTION)) {
continue;
}
@@ -273,8 +270,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
}
}
- private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
- String prefix) throws IOException {
+ private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf, String prefix)
+ throws IOException {
OutputStream out = null;
try {
out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
@@ -308,8 +305,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
* @param workflowApp workflow xml
* @param cluster cluster entity
*/
- protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
- String credentialName, Set<String> actions) {
+ protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName,
+ Set<String> actions) {
addHCatalogCredentials(workflowApp, cluster, credentialName);
// add credential to each action
@@ -359,4 +356,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
}
+
+ @Override
+ protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+ return super.getLibPath(cluster, buildPath.getParent());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index 08a4e5c..3347fbf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -38,10 +38,6 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
super(entity);
}
- @Override protected Path getLibPath(Cluster cluster, Path buildPath) {
- return new Path(buildPath, "lib");
- }
-
@Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
List<Properties> props = new ArrayList<Properties>();
List<Properties> evictionProps =
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
index 358475d..1db4ca4 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
@@ -61,8 +61,7 @@ public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
// adds hive-site.xml in hive classpath
hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml");
- addArchiveForCustomJars(cluster, hiveAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
- buildPath));
+ addArchiveForCustomJars(cluster, hiveAction.getArchive(), getLibPath(cluster, buildPath));
OozieUtils.marshalHiveAction(action, actionJaxbElement);
return action;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
index 6a83ddf..6bd5dd8 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
@@ -60,8 +60,7 @@ public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
}
- addArchiveForCustomJars(cluster, pigAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
- buildPath));
+ addArchiveForCustomJars(cluster, pigAction.getArchive(), getLibPath(cluster, buildPath));
return action;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index d0b75fc..ab38259 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -23,7 +23,6 @@ import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
@@ -106,10 +105,6 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
}
}
- @Override protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
- return ProcessHelper.getUserLibPath(entity, cluster, buildPath);
- }
-
@Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
copyUserWorkflow(cluster, buildPath);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 4b453c7..14759e0 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -44,6 +44,7 @@ import org.apache.falcon.messaging.EntityInstanceMessage;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.CONFIGURATION;
import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
@@ -629,6 +630,13 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+ List<CONFIGURATION.Property> props = bundle.getCoordinator().get(0).getConfiguration().getProperty();
+ for (CONFIGURATION.Property prop : props) {
+ if(prop.getName().equals("oozie.libpath")) {
+ Assert.assertEquals(prop.getValue().replace("${nameNode}", ""), new Path(bundlePath,
+ "userlib").toString());
+ }
+ }
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
testDefCoordMap(process, coord);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a13323ef/oozie/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/process-0.1.xml b/oozie/src/test/resources/config/process/process-0.1.xml
index 6148441..6383a04 100644
--- a/oozie/src/test/resources/config/process/process-0.1.xml
+++ b/oozie/src/test/resources/config/process/process-0.1.xml
@@ -34,7 +34,7 @@
<property name="mapred.job.priority" value="LOW"/>
</properties>
- <workflow name="test" version="1.0.0" engine="oozie" path="/user/guest/workflow"/>
+ <workflow name="test" version="1.0.0" engine="oozie" path="/user/guest/workflow" lib="/user/guest/workflowlib"/>
<retry policy="periodic" delay="hours(10)" attempts="3"/>