You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/09 01:44:43 UTC
[4/5] git commit: FALCON-789 Post processing is broken for Feeds.
Contributed by Sowmya Ramesh
FALCON-789 Post processing is broken for Feeds. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/43110bd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/43110bd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/43110bd9
Branch: refs/heads/master
Commit: 43110bd9d5708963a826e9c39389ec99491a5355
Parents: d82c01d
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Oct 8 16:24:03 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Oct 8 16:24:03 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../workflow/WorkflowExecutionContext.java | 24 ++++++++++-------
.../workflow/WorkflowExecutionContextTest.java | 28 +++++++++++++++++---
.../messaging/JMSMessageConsumerTest.java | 6 +++--
4 files changed, 46 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c5ba06c..d7571ba 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -115,6 +115,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-789 Post processing is broken for Feeds (Sowmya Ramesh via
+ Venkatesh Seetharam)
+
FALCON-788 Lineage: Minor bug fixes (Sowmya Ramesh via Venkatesh Seetharam)
FALCON-777 UT intermittent failures in Messaging tests depending on Thread.sleep
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 8df6855..f3e643e 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -126,9 +126,7 @@ public class WorkflowExecutionContext {
}
public String getContextFile() {
- return EntityType.PROCESS.name().equals(getEntityType())
- ? getValue(WorkflowExecutionArgs.CONTEXT_FILE)
- : "/context/" + getValue(WorkflowExecutionArgs.CONTEXT_FILE); // needed by feed clean up
+ return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
}
public String getLogDir() {
@@ -326,9 +324,14 @@ public class WorkflowExecutionContext {
}
}
- public static String getFilePath(String logDir, String entityName) {
+ public static String getFilePath(String logDir, String entityName, String entityType,
+ EntityOperations operation) {
+ // needed by feed clean up
+ String parentSuffix = EntityType.PROCESS.name().equals(entityType)
+ || EntityOperations.REPLICATE == operation ? "" : "/context/";
+
// LOG_DIR is sufficiently unique
- return new Path(logDir, entityName + "-wf-post-exec-context.json").toString();
+ return new Path(logDir + parentSuffix, entityName + "-wf-post-exec-context.json").toString();
}
public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException {
@@ -346,12 +349,13 @@ public class WorkflowExecutionContext {
throw new FalconException("Error parsing wf args", e);
}
- wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
- wfProperties.put(WorkflowExecutionArgs.CONTEXT_FILE,
- getFilePath(wfProperties.get(WorkflowExecutionArgs.LOG_DIR),
- wfProperties.get(WorkflowExecutionArgs.ENTITY_NAME)));
+ WorkflowExecutionContext executionContext = new WorkflowExecutionContext(wfProperties);
+ executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
+ executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
+ getFilePath(executionContext.getLogDir(), executionContext.getEntityName(),
+ executionContext.getEntityType(), executionContext.getOperation()));
- return new WorkflowExecutionContext(wfProperties);
+ return executionContext;
}
private static CommandLine getCommand(String[] arguments) throws ParseException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index 117f2b3..65a057d 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -96,7 +96,8 @@ public class WorkflowExecutionContextTest {
@Test
public void testGetContextFile() throws Exception {
Assert.assertEquals(context.getContextFile(),
- WorkflowExecutionContext.getFilePath(context.getLogDir(), context.getEntityName()));
+ WorkflowExecutionContext.getFilePath(context.getLogDir(), context.getEntityName(),
+ context.getEntityType(), context.getOperation()));
}
@Test
@@ -257,9 +258,30 @@ public class WorkflowExecutionContextTest {
}
@Test
- public void testGetFilePath() throws Exception {
- Assert.assertEquals(WorkflowExecutionContext.getFilePath(LOGS_DIR, ENTITY_NAME),
+ public void testGetFilePathForProcess() throws Exception {
+ final String filePath = WorkflowExecutionContext.getFilePath(LOGS_DIR,
+ ENTITY_NAME, "PROCESS", WorkflowExecutionContext.EntityOperations.GENERATE);
+ Assert.assertEquals(filePath,
LOGS_DIR + "/" + ENTITY_NAME + "-wf-post-exec-context.json");
+ Assert.assertEquals(context.getContextFile(), filePath);
+ }
+
+
+ @Test
+ public void testGetFilePathForFeedRetention() throws Exception {
+ final String filePath = WorkflowExecutionContext.getFilePath(LOGS_DIR,
+ ENTITY_NAME, "FEED", WorkflowExecutionContext.EntityOperations.DELETE);
+ Assert.assertEquals(filePath,
+ LOGS_DIR + "/context/" + ENTITY_NAME + "-wf-post-exec-context.json");
+ }
+
+ @Test
+ public void testGetFilePathForFeedReplication() throws Exception {
+ final String filePath = WorkflowExecutionContext.getFilePath(LOGS_DIR,
+ ENTITY_NAME, "FEED", WorkflowExecutionContext.EntityOperations.REPLICATE);
+ Assert.assertEquals(filePath,
+ LOGS_DIR + "/" + ENTITY_NAME + "-wf-post-exec-context.json");
+ Assert.assertEquals(context.getContextFile(), filePath);
}
private static String[] getTestMessageArgs() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 974116d..fa9ecbf 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -76,7 +76,8 @@ public class JMSMessageConsumerTest {
for (int i = 0; i < 3; i++) {
WorkflowExecutionContext context = WorkflowExecutionContext.create(
getMockFalconMessage(i), WorkflowExecutionContext.Type.POST_PROCESSING);
- context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
+ context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1",
+ "process", WorkflowExecutionContext.EntityOperations.GENERATE));
MapMessage message = session.createMapMessage();
for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
@@ -89,7 +90,8 @@ public class JMSMessageConsumerTest {
WorkflowExecutionContext context = WorkflowExecutionContext.create(
getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING);
- context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
+ context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1",
+ "process", WorkflowExecutionContext.EntityOperations.GENERATE));
MapMessage mapMessage = session.createMapMessage();
for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {