You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/09 01:44:43 UTC

[4/5] git commit: FALCON-789 Post processing is broken for Feeds. Contributed by Sowmya Ramesh

FALCON-789 Post processing is broken for Feeds. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/43110bd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/43110bd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/43110bd9

Branch: refs/heads/master
Commit: 43110bd9d5708963a826e9c39389ec99491a5355
Parents: d82c01d
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Oct 8 16:24:03 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Oct 8 16:24:03 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../workflow/WorkflowExecutionContext.java      | 24 ++++++++++-------
 .../workflow/WorkflowExecutionContextTest.java  | 28 +++++++++++++++++---
 .../messaging/JMSMessageConsumerTest.java       |  6 +++--
 4 files changed, 46 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c5ba06c..d7571ba 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -115,6 +115,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-789 Post processing is broken for Feeds (Sowmya Ramesh via
+   Venkatesh Seetharam)
+
    FALCON-788 Lineage: Minor bug fixes (Sowmya Ramesh via Venkatesh Seetharam)
 
    FALCON-777 UT intermittent failures in Messaging tests depending on Thread.sleep

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 8df6855..f3e643e 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -126,9 +126,7 @@ public class WorkflowExecutionContext {
     }
 
     public String getContextFile() {
-        return EntityType.PROCESS.name().equals(getEntityType())
-            ? getValue(WorkflowExecutionArgs.CONTEXT_FILE)
-            : "/context/" + getValue(WorkflowExecutionArgs.CONTEXT_FILE); // needed by feed clean up
+        return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
     }
 
     public String getLogDir() {
@@ -326,9 +324,14 @@ public class WorkflowExecutionContext {
         }
     }
 
-    public static String getFilePath(String logDir, String entityName) {
+    public static String getFilePath(String logDir, String entityName, String entityType,
+                                     EntityOperations operation) {
+        // needed by feed clean up
+        String parentSuffix = EntityType.PROCESS.name().equals(entityType)
+                || EntityOperations.REPLICATE == operation ? "" : "/context/";
+
         // LOG_DIR is sufficiently unique
-        return new Path(logDir, entityName + "-wf-post-exec-context.json").toString();
+        return new Path(logDir + parentSuffix, entityName + "-wf-post-exec-context.json").toString();
     }
 
     public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException {
@@ -346,12 +349,13 @@ public class WorkflowExecutionContext {
             throw new FalconException("Error parsing wf args", e);
         }
 
-        wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
-        wfProperties.put(WorkflowExecutionArgs.CONTEXT_FILE,
-                getFilePath(wfProperties.get(WorkflowExecutionArgs.LOG_DIR),
-                        wfProperties.get(WorkflowExecutionArgs.ENTITY_NAME)));
+        WorkflowExecutionContext executionContext = new WorkflowExecutionContext(wfProperties);
+        executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
+        executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
+                getFilePath(executionContext.getLogDir(), executionContext.getEntityName(),
+                        executionContext.getEntityType(), executionContext.getOperation()));
 
-        return new WorkflowExecutionContext(wfProperties);
+        return executionContext;
     }
 
     private static CommandLine getCommand(String[] arguments) throws ParseException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index 117f2b3..65a057d 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -96,7 +96,8 @@ public class WorkflowExecutionContextTest {
     @Test
     public void testGetContextFile() throws Exception {
         Assert.assertEquals(context.getContextFile(),
-                WorkflowExecutionContext.getFilePath(context.getLogDir(), context.getEntityName()));
+                WorkflowExecutionContext.getFilePath(context.getLogDir(), context.getEntityName(),
+                        context.getEntityType(), context.getOperation()));
     }
 
     @Test
@@ -257,9 +258,30 @@ public class WorkflowExecutionContextTest {
     }
 
     @Test
-    public void testGetFilePath() throws Exception {
-        Assert.assertEquals(WorkflowExecutionContext.getFilePath(LOGS_DIR, ENTITY_NAME),
+    public void testGetFilePathForProcess() throws Exception {
+        final String filePath = WorkflowExecutionContext.getFilePath(LOGS_DIR,
+                ENTITY_NAME, "PROCESS", WorkflowExecutionContext.EntityOperations.GENERATE);
+        Assert.assertEquals(filePath,
                 LOGS_DIR + "/" + ENTITY_NAME + "-wf-post-exec-context.json");
+        Assert.assertEquals(context.getContextFile(), filePath);
+    }
+
+
+    @Test
+    public void testGetFilePathForFeedRetention() throws Exception {
+        final String filePath = WorkflowExecutionContext.getFilePath(LOGS_DIR,
+                ENTITY_NAME, "FEED", WorkflowExecutionContext.EntityOperations.DELETE);
+        Assert.assertEquals(filePath,
+                LOGS_DIR + "/context/" + ENTITY_NAME + "-wf-post-exec-context.json");
+    }
+
+    @Test
+    public void testGetFilePathForFeedReplication() throws Exception {
+        final String filePath = WorkflowExecutionContext.getFilePath(LOGS_DIR,
+                ENTITY_NAME, "FEED", WorkflowExecutionContext.EntityOperations.REPLICATE);
+        Assert.assertEquals(filePath,
+                LOGS_DIR + "/" + ENTITY_NAME + "-wf-post-exec-context.json");
+        Assert.assertEquals(context.getContextFile(), filePath);
     }
 
     private static String[] getTestMessageArgs() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/43110bd9/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 974116d..fa9ecbf 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -76,7 +76,8 @@ public class JMSMessageConsumerTest {
         for (int i = 0; i < 3; i++) {
             WorkflowExecutionContext context = WorkflowExecutionContext.create(
                     getMockFalconMessage(i), WorkflowExecutionContext.Type.POST_PROCESSING);
-            context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
+            context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1",
+                    "process", WorkflowExecutionContext.EntityOperations.GENERATE));
 
             MapMessage message = session.createMapMessage();
             for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
@@ -89,7 +90,8 @@ public class JMSMessageConsumerTest {
 
         WorkflowExecutionContext context = WorkflowExecutionContext.create(
                 getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING);
-        context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
+        context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1",
+                "process", WorkflowExecutionContext.EntityOperations.GENERATE));
 
         MapMessage mapMessage = session.createMapMessage();
         for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {