You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/08/26 08:48:50 UTC
git commit: FALCON-630 late data rerun for process broken in trunk.
Contributed by Shwetha GS
Repository: incubator-falcon
Updated Branches:
refs/heads/master 5000fbbd6 -> 06f52eca7
FALCON-630 late data rerun for process broken in trunk. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/06f52eca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/06f52eca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/06f52eca
Branch: refs/heads/master
Commit: 06f52eca7b324b779aa88bdaaa12df618a03fdb2
Parents: 5000fbb
Author: Shwetha GS <sh...@inmobi.com>
Authored: Tue Aug 26 12:17:56 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Tue Aug 26 12:18:32 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../falcon/workflow/WorkflowExecutionArgs.java | 8 +++---
.../workflow/WorkflowExecutionContext.java | 8 +++---
.../metadata/MetadataMappingServiceTest.java | 4 +--
.../workflow/WorkflowExecutionContextTest.java | 4 +--
.../WorkflowJobEndNotificationServiceTest.java | 4 +--
.../falcon/messaging/JMSMessageProducer.java | 8 +++---
.../falcon/messaging/FeedProducerTest.java | 12 ++++-----
.../messaging/JMSMessageConsumerTest.java | 4 +--
.../messaging/JMSMessageProducerTest.java | 24 +++++++++---------
.../falcon/messaging/ProcessProducerTest.java | 12 ++++-----
.../feed/FeedReplicationCoordinatorBuilder.java | 11 +++++----
.../feed/FeedRetentionCoordinatorBuilder.java | 4 +--
.../ProcessExecutionCoordinatorBuilder.java | 26 +++++++++++---------
.../src/main/resources/action/post-process.xml | 2 +-
oozie/src/main/resources/action/pre-process.xml | 4 +--
.../feed/OozieFeedWorkflowBuilderTest.java | 3 +++
.../OozieProcessWorkflowBuilderTest.java | 15 +++++++----
.../workflow/FalconPostProcessingTest.java | 12 ++++-----
.../metadata/LineageMetadataResourceTest.java | 4 +--
.../apache/falcon/latedata/LateDataHandler.java | 10 +++++---
.../falcon/rerun/handler/LateRerunConsumer.java | 22 +++++++++--------
.../apache/falcon/late/LateDataHandlerIT.java | 4 +--
23 files changed, 114 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e3f919..36ca573 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-630 late data rerun for process broken in trunk. (Shwetha GS)
+
FALCON-611 Post process arg status is in 'FAILED' state always
(Shwetha GS via Suhas Vasu)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 92af3e1..514bafe 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -58,11 +58,13 @@ public enum WorkflowExecutionArgs {
// what inputs
INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs", false),
- INPUT_FEED_PATHS("falconInputPaths", "comma separated input feed instance paths", false),
+ INPUT_FEED_PATHS("falconInPaths", "comma separated input feed instance paths", false),
+ INPUT_NAMES("falconInputNames", "name of the inputs", false),
+ INPUT_STORAGE_TYPES("falconInputFeedStorageTypes", "input storage types", false),
// what outputs
- FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
- FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
+ OUTPUT_FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
+ OUTPUT_FEED_PATHS("feedInstancePaths", "comma separated feed instance paths"),
// broker related parameters
TOPIC_NAME("topicName", "name of the topic to be used to send JMS message", false),
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 786e94f..f5bb782 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -80,8 +80,8 @@ public class WorkflowExecutionContext {
WorkflowExecutionArgs.NOMINAL_TIME,
WorkflowExecutionArgs.OPERATION,
- WorkflowExecutionArgs.FEED_NAMES,
- WorkflowExecutionArgs.FEED_INSTANCE_PATHS,
+ WorkflowExecutionArgs.OUTPUT_FEED_NAMES,
+ WorkflowExecutionArgs.OUTPUT_FEED_PATHS,
WorkflowExecutionArgs.WORKFLOW_ID,
WorkflowExecutionArgs.WORKFLOW_USER,
@@ -177,7 +177,7 @@ public class WorkflowExecutionContext {
}
public String getOutputFeedNames() {
- return getValue(WorkflowExecutionArgs.FEED_NAMES);
+ return getValue(WorkflowExecutionArgs.OUTPUT_FEED_NAMES);
}
public String[] getOutputFeedNamesList() {
@@ -185,7 +185,7 @@ public class WorkflowExecutionContext {
}
public String getOutputFeedInstancePaths() {
- return getValue(WorkflowExecutionArgs.FEED_INSTANCE_PATHS);
+ return getValue(WorkflowExecutionArgs.OUTPUT_FEED_PATHS);
}
public String[] getOutputFeedInstancePathsList() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index b51caf8..2b030fd 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -644,8 +644,8 @@ public class MetadataMappingServiceTest {
"-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
"-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index e97175e..a45633b 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -273,8 +273,8 @@ public class WorkflowExecutionContextTest {
"-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
"-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 2fe08e4..9a6ad98 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -135,8 +135,8 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
"-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
"-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 39d6fab..fc31bab 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -205,13 +205,13 @@ public class JMSMessageProducer {
// override default values
if (context.getEntityType().equalsIgnoreCase("PROCESS")) {
- change(message, WorkflowExecutionArgs.FEED_NAMES, feedNames[i]);
+ change(message, WorkflowExecutionArgs.OUTPUT_FEED_NAMES, feedNames[i]);
} else {
- change(message, WorkflowExecutionArgs.FEED_NAMES,
- message.get(WorkflowExecutionArgs.FEED_NAMES.getName()));
+ change(message, WorkflowExecutionArgs.OUTPUT_FEED_NAMES,
+ message.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()));
}
- change(message, WorkflowExecutionArgs.FEED_INSTANCE_PATHS, feedPaths[i]);
+ change(message, WorkflowExecutionArgs.OUTPUT_FEED_PATHS, feedPaths[i]);
messages.add(message);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 1c10be5..c45ea1e 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -71,8 +71,8 @@ public class FeedProducerTest {
args = new String[] {
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), TOPIC_NAME,
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs",
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs",
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
"/click-logs/10/05/05/00/20",
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
@@ -179,7 +179,7 @@ public class FeedProducerTest {
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/falcon/feed/agg-logs/path1/2010/10/10/20");
for (m = null; m == null;) {
@@ -187,7 +187,7 @@ public class FeedProducerTest {
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/falcon/feed/agg-logs/path1/2010/10/10/21");
for (m = null; m == null;) {
@@ -195,7 +195,7 @@ public class FeedProducerTest {
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/falcon/feed/agg-logs/path1/2010/10/10/22");
for (m = null; m == null;) {
@@ -203,7 +203,7 @@ public class FeedProducerTest {
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/falcon/feed/agg-logs/path1/2010/10/10/23");
connection.close();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index b1f8271..9a4a6f7 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -105,9 +105,9 @@ public class JMSMessageConsumerTest {
message.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), "cluster1");
message.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), "process1");
message.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), "PROCESS");
- message.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ message.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
"/clicks/hour/00/0" + i);
- message.put(WorkflowExecutionArgs.FEED_NAMES.getName(), "clicks");
+ message.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "clicks");
message.put(WorkflowExecutionArgs.LOG_FILE.getName(), "/logfile");
message.put(WorkflowExecutionArgs.LOG_DIR.getName(), "/tmp/log");
message.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2012-10-10-10-10");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
index 34cff77..e4ea22f 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -72,8 +72,8 @@ public class JMSMessageProducerTest {
List<String> args = createCommonArgs();
List<String> newArgs = new ArrayList<String>(Arrays.asList(
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs,raw-logs",
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs,raw-logs",
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
"/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
args.addAll(newArgs);
List<String[]> messages = new ArrayList<String[]>();
@@ -81,10 +81,10 @@ public class JMSMessageProducerTest {
testProcessMessageCreator(messages, TOPIC_NAME);
for (MapMessage m : mapMessages) {
assertMessage(m);
- Assert.assertTrue((m.getString(WorkflowExecutionArgs.FEED_NAMES.getName())
+ Assert.assertTrue((m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName())
.equals("click-logs,raw-logs")));
Assert.assertTrue(m
- .getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName())
+ .getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName())
.equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
}
}
@@ -94,8 +94,8 @@ public class JMSMessageProducerTest {
List<String> args = createCommonArgs();
List<String> newArgs = new ArrayList<String>(Arrays.asList(
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "null",
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), "null"));
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "null",
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "null"));
args.addAll(newArgs);
List<String[]> messages = new ArrayList<String[]>();
messages.add(args.toArray(new String[args.size()]));
@@ -103,9 +103,9 @@ public class JMSMessageProducerTest {
for (MapMessage m : mapMessages) {
assertMessage(m);
assertMessage(m);
- Assert.assertTrue(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()).equals(
+ Assert.assertTrue(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()).equals(
"null"));
- Assert.assertTrue(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName())
+ Assert.assertTrue(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName())
.equals("null"));
}
}
@@ -116,8 +116,8 @@ public class JMSMessageProducerTest {
List<String> args = createCommonArgs();
List<String> newArgs = new ArrayList<String>(Arrays.asList(
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "raw-logs",
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "raw-logs",
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
"/raw-logs/10/05/05/00/20"));
args.addAll(newArgs);
messages.add(args.toArray(new String[args.size()]));
@@ -125,8 +125,8 @@ public class JMSMessageProducerTest {
args = createCommonArgs();
newArgs = new ArrayList<String>(Arrays.asList(
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs",
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs",
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
"/click-logs/10/05/05/00/20"));
args.addAll(newArgs);
messages.add(args.toArray(new String[args.size()]));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index ccb47df..80c2701 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -52,8 +52,8 @@ public class ProcessProducerTest {
public void setup() throws Exception {
args = new String[] {
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs,raw-logs",
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs,raw-logs",
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
"/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
@@ -133,8 +133,8 @@ public class ProcessProducerTest {
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "click-logs");
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "click-logs");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/click-logs/10/05/05/00/20");
for (m = null; m == null;) {
@@ -142,8 +142,8 @@ public class ProcessProducerTest {
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "raw-logs");
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "raw-logs");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/raw-logs/10/05/05/00/20");
connection.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index f0864db..966f90e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -261,16 +261,17 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
String falconFeedStorageType, Properties props) {
// todo these pairs are the same but used in different context
// late data handler - should-record action
- props.put("falconInputFeeds", entity.getName());
- props.put("falconInPaths", instancePaths);
+ props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), instancePaths);
+ props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), entity.getName());
// storage type for each corresponding feed - in this case only one feed is involved
// needed to compute usage based on storage type in LateDataHandler
- props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+ props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), falconFeedStorageType);
// falcon post processing
- props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
- props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), "${coord:dataOut('output')}");
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}");
}
private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 3c74485..c896d5a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -86,8 +86,8 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
props.put("limit", feedCluster.getRetention().getLimit().toString());
- props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
- props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
props.put("falconInputFeeds", entity.getName());
props.put("falconInPaths", IGNORE);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index a33fa62..1fa6758 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -152,12 +152,14 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
private void initializeInputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
if (entity.getInputs() == null) {
- props.put("falconInputFeeds", "NONE");
- props.put("falconInPaths", IGNORE);
+ props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), "NONE");
+ props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), IGNORE);
+ props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), IGNORE);
return;
}
List<String> inputFeeds = new ArrayList<String>();
+ List<String> inputNames = new ArrayList<String>();
List<String> inputPaths = new ArrayList<String>();
List<String> inputFeedStorageTypes = new ArrayList<String>();
for (Input input : entity.getInputs().getInputs()) {
@@ -190,21 +192,23 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
inputFeeds.add(feed.getName());
inputPaths.add(inputExpr);
+ inputNames.add(input.getName());
inputFeedStorageTypes.add(storage.getType().name());
}
- propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
+ propagateLateDataProperties(inputFeeds, inputNames, inputPaths, inputFeedStorageTypes, props);
}
- private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+ private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputNames, List<String> inputPaths,
List<String> inputFeedStorageTypes, Properties props) {
// populate late data handler - should-record action
- props.put("falconInputFeeds", StringUtils.join(inputFeeds, '#'));
- props.put("falconInPaths", StringUtils.join(inputPaths, '#'));
+ props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(inputFeeds, '#'));
+ props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), StringUtils.join(inputNames, '#'));
+ props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), StringUtils.join(inputPaths, '#'));
// storage type for each corresponding feed sent as a param to LateDataHandler
// needed to compute usage based on storage type in LateDataHandler
- props.put("falconInputFeedStorageTypes", StringUtils.join(inputFeedStorageTypes, '#'));
+ props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), StringUtils.join(inputFeedStorageTypes, '#'));
}
private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
@@ -250,8 +254,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
if (entity.getOutputs() == null) {
- props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), "NONE");
- props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "NONE");
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
return;
}
@@ -289,8 +293,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
}
// Output feed name and path for parent workflow
- props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), StringUtils.join(outputFeeds, ','));
- props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), StringUtils.join(outputPaths, ','));
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(outputFeeds, ','));
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(outputPaths, ','));
}
private DATAOUT createDataOut(Output output) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index 2714859..979d4f0 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -80,7 +80,7 @@
<arg>${wf:user()}</arg>
<arg>-falconInputFeeds</arg>
<arg>${falconInputFeeds}</arg>
- <arg>-falconInputPaths</arg>
+ <arg>-falconInPaths</arg>
<arg>${falconInPaths}</arg>
<file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
<file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/main/resources/action/pre-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/pre-process.xml b/oozie/src/main/resources/action/pre-process.xml
index 127ab80..070c42b 100644
--- a/oozie/src/main/resources/action/pre-process.xml
+++ b/oozie/src/main/resources/action/pre-process.xml
@@ -39,8 +39,8 @@
<arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
<arg>-paths</arg>
<arg>${falconInPaths}</arg>
- <arg>-falconInputFeeds</arg>
- <arg>${falconInputFeeds}</arg>
+ <arg>-falconInputNames</arg>
+ <arg>${falconInputNames}</arg>
<arg>-falconInputFeedStorageTypes</arg>
<arg>${falconInputFeedStorageTypes}</arg>
<capture-output/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index e47895f..3c49353 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -48,6 +48,7 @@ import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -206,6 +207,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), feed.getName());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
@@ -443,6 +445,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), tableFeed.getName());
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
// verify the post processing params
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 4daf5d8..b1d7a8a 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -188,6 +188,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
assertEquals(props.get("mapred.job.priority"), "LOW");
+ List<Input> inputs = process.getInputs().getInputs();
+ assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName() + "#" + inputs
+ .get(1).getName());
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, props);
@@ -558,6 +561,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+ Assert.assertEquals(props.get("falconInputs"), process.getInputs().getInputs().get(0).getName());
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
@@ -688,10 +692,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
verifyBrokerProperties(cluster, props);
String[] expected = {
- WorkflowExecutionArgs.FEED_NAMES.getName(),
- WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
+ WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
- "falconInPaths",
+ WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
+ WorkflowExecutionArgs.INPUT_NAMES.getName(),
WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
@@ -729,7 +734,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
verifyBrokerProperties(cluster, props);
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
- Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "NONE");
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "NONE");
}
@Test
@@ -758,7 +763,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
WorkflowExecutionContext.EntityOperations.GENERATE, props);
verifyBrokerProperties(cluster, props);
- Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "impressions");
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "impressions");
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 91559a5..201b682 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -46,8 +46,8 @@ public class FalconPostProcessingTest {
public void setup() throws Exception {
args = new String[]{
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
"/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
@@ -131,13 +131,13 @@ public class FalconPostProcessingTest {
assertMessage(m);
if (topic.equals(FALCON_TOPIC_NAME)) {
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
"out-click-logs,out-raw-logs");
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20");
} else {
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "out-click-logs");
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "out-click-logs");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
"/out-click-logs/10/05/05/00/20");
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index d9e9f86..9bef7f5 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -542,8 +542,8 @@ public class LineageMetadataResourceTest {
"-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
"-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
- "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 75de12e..d854bdd 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -72,12 +73,12 @@ public class LateDataHandler extends Configured implements Tool {
opt.setRequired(true);
options.addOption(opt);
- opt = new Option("falconInputFeeds", true,
+ opt = new Option(WorkflowExecutionArgs.INPUT_NAMES.getName(), true,
"Input feed names, further separated by #");
opt.setRequired(true);
options.addOption(opt);
- opt = new Option("falconInputFeedStorageTypes", true,
+ opt = new Option(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), true,
"Feed storage types corresponding to Input feed names, separated by #");
opt.setRequired(true);
options.addOption(opt);
@@ -94,9 +95,10 @@ public class LateDataHandler extends Configured implements Tool {
return 0;
}
- String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split("#");
+ String[] inputFeeds = getOptionValue(command, WorkflowExecutionArgs.INPUT_NAMES.getName()).split("#");
String[] pathGroups = pathStr.split("#");
- String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#");
+ String[] inputFeedStorageTypes =
+ getOptionValue(command, WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()).split("#");
Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 16e340e..80a3b83 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -26,6 +26,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.latedata.LateDataHandler;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -87,11 +88,12 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
LateDataHandler late = new LateDataHandler();
Properties properties = handler.getWfEngine().getWorkflowProperties(
message.getClusterName(), message.getWfId());
- String falconInputFeeds = properties.getProperty("falconInputFeeds");
- String falconInPaths = properties.getProperty("falconInPaths");
- String falconInputFeedStorageTypes = properties.getProperty("falconInputFeedStorageTypes");
- String logDir = properties.getProperty("logDir");
- String nominalTime = properties.getProperty("nominalTime");
+ String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
+ String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
+ String falconInputFeedStorageTypes =
+ properties.getProperty(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName());
+ String logDir = properties.getProperty(WorkflowExecutionArgs.LOG_DIR.getName());
+ String nominalTime = properties.getProperty(WorkflowExecutionArgs.NOMINAL_TIME.getName());
String srcClusterName = properties.getProperty("srcClusterName");
Path lateLogPath = handler.getLateLogPath(logDir, nominalTime, srcClusterName);
@@ -104,22 +106,22 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
}
String[] pathGroups = falconInPaths.split("#");
- String[] inputFeeds = falconInputFeeds.split("#");
+ String[] inputs = falconInputs.split("#");
String[] inputFeedStorageTypes = falconInputFeedStorageTypes.split("#");
Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
Entity entity = EntityUtil.getEntity(message.getEntityType(), message.getEntityName());
if (EntityUtil.getLateProcess(entity) != null) {
- List<String> lateFeed = new ArrayList<String>();
+ List<String> lateInput = new ArrayList<String>();
for (LateInput li : EntityUtil.getLateProcess(entity).getLateInputs()) {
- lateFeed.add(li.getInput());
+ lateInput.add(li.getInput());
}
for (int index = 0; index < pathGroups.length; index++) {
- if (lateFeed.contains(inputFeeds[index])) {
+ if (lateInput.contains(inputs[index])) {
long computedMetric = late.computeStorageMetric(
pathGroups[index], inputFeedStorageTypes[index], conf);
- computedMetrics.put(inputFeeds[index], computedMetric);
+ computedMetrics.put(inputs[index], computedMetric);
}
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06f52eca/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index ab60307..96c99c5 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -118,7 +118,7 @@ public class LateDataHandlerIT {
String[] args = {
"-out", lateDataDir,
"-paths", feedUriTemplate,
- "-falconInputFeeds", "foo",
+ "-falconInputNames", "foo",
"-falconInputFeedStorageTypes", "TABLE",
};
@@ -158,7 +158,7 @@ public class LateDataHandlerIT {
String[] args = {
"-out", lateDataDir,
"-paths", feedUriTemplate,
- "-falconInputFeeds", "foo",
+ "-falconInputNames", "foo",
"-falconInputFeedStorageTypes", "TABLE",
};