You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ar...@apache.org on 2014/08/21 19:30:49 UTC

[07/18] git commit: FALCON-583. Post processing is broken in current trunk. Contributed by Venkatesh Seetharam

FALCON-583. Post processing is broken in current trunk. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/07397774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/07397774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/07397774

Branch: refs/heads/FALCON-585
Commit: 0739777413ac8487a76b9983aa23ecb6cf42104d
Parents: a05a288
Author: Suhas V <su...@inmobi.com>
Authored: Tue Aug 19 12:38:29 2014 +0530
Committer: Suhas V <su...@inmobi.com>
Committed: Tue Aug 19 12:38:29 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../org/apache/falcon/entity/FeedHelper.java    |  5 +-
 .../workflow/WorkflowExecutionContext.java      |  8 +-
 .../WorkflowJobEndNotificationService.java      | 18 ++++-
 .../workflow/WorkflowExecutionContextTest.java  |  2 +-
 .../falcon/messaging/JMSMessageProducer.java    | 12 ---
 .../falcon/messaging/FeedProducerTest.java      |  4 +-
 .../messaging/JMSMessageConsumerTest.java       |  2 +-
 .../messaging/JMSMessageProducerTest.java       |  4 +-
 .../falcon/messaging/ProcessProducerTest.java   |  4 +-
 .../falcon/oozie/OozieCoordinatorBuilder.java   | 79 +++++++++++++-------
 .../OozieOrchestrationWorkflowBuilder.java      | 23 ++++--
 .../feed/FeedReplicationCoordinatorBuilder.java | 14 +++-
 .../feed/FeedReplicationWorkflowBuilder.java    | 13 +---
 .../feed/FeedRetentionCoordinatorBuilder.java   | 12 ++-
 .../feed/FeedRetentionWorkflowBuilder.java      |  5 +-
 .../ProcessExecutionCoordinatorBuilder.java     |  9 ++-
 .../ProcessExecutionWorkflowBuilder.java        |  8 +-
 .../src/main/resources/action/post-process.xml  |  6 +-
 .../feed/OozieFeedWorkflowBuilderTest.java      | 69 ++++++++---------
 .../falcon/oozie/process/AbstractTestBase.java  | 51 +++++++++++++
 .../OozieProcessWorkflowBuilderTest.java        | 74 ++++++++----------
 .../workflow/FalconPostProcessingTest.java      |  4 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |  2 +
 24 files changed, 260 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4dfdc4..0fc3608 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-583 Post processing is broken in current trunk
+   (Venkatesh Seetharam via Suhas Vasu)
+
    FALCON-582 Latest changes to LICENSE files results in build failure
    (Srikanth Sundarrajan via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index d09a12f..323188d 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Property;
@@ -275,9 +276,9 @@ public final class FeedHelper {
                 + storage.getTable();
     }
 
-    public static Properties getUserWorkflowProperties(String policy) {
+    public static Properties getUserWorkflowProperties(LifeCycle lifeCycle) {
         Properties props = new Properties();
-        props.put("userWorkflowName", policy + "-policy");
+        props.put("userWorkflowName", lifeCycle.name().toLowerCase() + "-policy");
         props.put("userWorkflowEngine", "falcon");
 
         String version;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 637cc3e..786e94f 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -50,7 +50,7 @@ public class WorkflowExecutionContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(WorkflowExecutionContext.class);
 
-    public static final String PROCESS_INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
+    public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
 
     public static final String OUTPUT_FEED_SEPARATOR = ",";
     public static final String INPUT_FEED_SEPARATOR = "#";
@@ -145,10 +145,10 @@ public class WorkflowExecutionContext {
      * @return a ISO8601 formatted string
      */
     public String getNominalTimeAsISO8601() {
-        return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), PROCESS_INSTANCE_FORMAT);
+        return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), INSTANCE_FORMAT);
     }
 
-    public String getTimestamp() {
+    String getTimestamp() {
         return getValue(WorkflowExecutionArgs.TIMESTAMP);
     }
 
@@ -157,7 +157,7 @@ public class WorkflowExecutionContext {
      * @return a ISO8601 formatted string
      */
     public String getTimeStampAsISO8601() {
-        return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), PROCESS_INSTANCE_FORMAT);
+        return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), INSTANCE_FORMAT);
     }
 
     public String getClusterName() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 39237f9..67f6c79 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -27,6 +27,8 @@ import org.apache.falcon.service.FalconService;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 import java.util.LinkedHashSet;
@@ -37,6 +39,8 @@ import java.util.Set;
  */
 public class WorkflowJobEndNotificationService implements FalconService {
 
+    private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
+
     public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
 
     private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
@@ -75,7 +79,12 @@ public class WorkflowJobEndNotificationService implements FalconService {
 
     public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
         for (WorkflowExecutionListener listener : listeners) {
-            listener.onFailure(context);
+            try {
+                listener.onFailure(context);
+            } catch (Throwable t) {
+                // do not rethrow as other listeners do not get a chance
+                LOG.error("Error in listener {}", listener.getClass().getName(), t);
+            }
         }
 
         instrumentAlert(context);
@@ -83,7 +92,12 @@ public class WorkflowJobEndNotificationService implements FalconService {
 
     public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
         for (WorkflowExecutionListener listener : listeners) {
-            listener.onSuccess(context);
+            try {
+                listener.onSuccess(context);
+            } catch (Throwable t) {
+                // do not rethrow as other listeners do not get a chance
+                LOG.error("Error in listener {}", listener.getClass().getName(), t);
+            }
         }
 
         instrumentAlert(context);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index 93d8831..e97175e 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -52,7 +52,7 @@ public class WorkflowExecutionContextTest {
     private static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
 
     private static final String ISO8601_TIME = SchemaHelper.formatDateUTCToISO8601(
-            NOMINAL_TIME, WorkflowExecutionContext.PROCESS_INSTANCE_FORMAT);
+            NOMINAL_TIME, WorkflowExecutionContext.INSTANCE_FORMAT);
 
     private WorkflowExecutionContext context;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index aeadf5c..39d6fab 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.messaging;
 
-import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
@@ -213,7 +212,6 @@ public class JMSMessageProducer {
             }
 
             change(message, WorkflowExecutionArgs.FEED_INSTANCE_PATHS, feedPaths[i]);
-            convertDateFormat(message);
             messages.add(message);
         }
 
@@ -320,16 +318,6 @@ public class JMSMessageProducer {
         return message;
     }
 
-    public void convertDateFormat(Map<String, String> message) {
-        String date = message.get(WorkflowExecutionArgs.NOMINAL_TIME.getName());
-        change(message, WorkflowExecutionArgs.NOMINAL_TIME,
-                SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm"));
-
-        date = message.get(WorkflowExecutionArgs.TIMESTAMP.getName());
-        change(message, WorkflowExecutionArgs.TIMESTAMP,
-                SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm"));
-    }
-
     @SuppressWarnings("unchecked")
     private Connection createAndStartConnection(String implementation, String userName,
                                                 String password, String url)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 9119624..1c10be5 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -219,9 +219,9 @@ public class FeedProducerTest {
                 "falcon");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
-                "2011-01-01T01:00Z");
+                "2011-01-01-01-00");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
-                "2012-01-01T01:00Z");
+                "2012-01-01-01-00");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 694d488..b1f8271 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -86,7 +86,7 @@ public class JMSMessageConsumerTest {
         }
 
         WorkflowExecutionContext context = WorkflowExecutionContext.create(
-                getMockFalconMessage(15), WorkflowExecutionContext.Type.POST_PROCESSING);
+                getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING);
         context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
 
         MapMessage mapMessage = session.createMapMessage();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
index 90efa3e..34cff77 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -210,9 +210,9 @@ public class JMSMessageProducerTest {
         Assert.assertEquals(message.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()),
                 "agg-coord");
         Assert.assertEquals(message.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
-                "2011-01-01T01:00Z");
+                "2011-01-01-01-00");
         Assert.assertEquals(message.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
-                "2012-01-01T01:00Z");
+                "2012-01-01-01-00");
         Assert.assertEquals(message.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 95a3780..ccb47df 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -160,9 +160,9 @@ public class ProcessProducerTest {
                 "falcon");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
-                "2011-01-01T01:00Z");
+                "2011-01-01-01-00");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
-                "2012-01-01T01:00Z");
+                "2012-01-01-01-00");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 459307c..fe2136b 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -36,6 +37,7 @@ import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClient;
 
@@ -55,11 +57,19 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
     protected static final String MR_JOB_PRIORITY = "jobPriority";
 
     protected static final String IGNORE = "IGNORE";
-    protected final Tag lifecycle;
+    protected final LifeCycle lifecycle;
 
-    public OozieCoordinatorBuilder(T entity, Tag tag) {
+    public OozieCoordinatorBuilder(T entity, LifeCycle lifecycle) {
         super(entity);
-        this.lifecycle = tag;
+        this.lifecycle = lifecycle;
+    }
+
+    public LifeCycle getLifecycle() {
+        return lifecycle;
+    }
+
+    public Tag getTag() {
+        return lifecycle.getTag();
     }
 
     public static OozieCoordinatorBuilder get(Entity entity, Tag tag) {
@@ -87,11 +97,11 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
     }
 
     protected Path getBuildPath(Path buildPath) {
-        return new Path(buildPath, lifecycle.name());
+        return new Path(buildPath, getTag().name());
     }
 
     protected String getEntityName() {
-        return EntityUtil.getWorkflowName(lifecycle, entity).toString();
+        return EntityUtil.getWorkflowName(getTag(), entity).toString();
     }
 
     protected Path marshal(Cluster cluster, COORDINATORAPP coord,
@@ -104,26 +114,32 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
                                                          String coordName)  throws FalconException {
         Properties props = new Properties();
         props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
         props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
         props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
-        props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
-        props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
-        String falconBrokerUrl = StartupProperties.get().getProperty(
-                WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true");
-        props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
-        String falconBrokerImplClass = StartupProperties.get().getProperty(
-                WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
-        props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
-        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
-            DEFAULT_BROKER_MSG_TTL.toString());
-        props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
-        props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
-        props.put("logDir", getLogDirectory(cluster));
+        props.put("falconDataOperation", getOperation().name());
+
+        props.put(WorkflowExecutionArgs.LOG_DIR.getName(), getLogDirectory(cluster));
         props.put(OozieClient.EXTERNAL_ID,
             new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());
-        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+        props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
+
+        addLateDataProperties(props);
+        addBrokerProperties(cluster, props);
 
+        props.put(MR_QUEUE_NAME, "default");
+        props.put(MR_JOB_PRIORITY, "NORMAL");
+
+        //props in entity override the set props.
+        props.putAll(getEntityProperties(entity));
+        return props;
+    }
+
+    protected abstract WorkflowExecutionContext.EntityOperations getOperation();
+
+    private void addLateDataProperties(Properties props) throws FalconException {
         if (EntityUtil.getLateProcess(entity) == null
             || EntityUtil.getLateProcess(entity).getLateInputs() == null
             || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
@@ -131,16 +147,25 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         } else {
             props.put("shouldRecord", "true");
         }
+    }
 
-        props.put("entityName", entity.getName());
-        props.put("entityType", entity.getEntityType().name().toLowerCase());
-        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+    private void addBrokerProperties(Cluster cluster, Properties props) {
+        props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
+                ClusterHelper.getMessageBrokerUrl(cluster));
+        props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
+                ClusterHelper.getMessageBrokerImplClass(cluster));
 
-        props.put(MR_QUEUE_NAME, "default");
-        props.put(MR_JOB_PRIORITY, "NORMAL");
-        //props in entity override the set props.
-        props.putAll(getEntityProperties(entity));
-        return props;
+        String falconBrokerUrl = StartupProperties.get().getProperty(
+                "broker.url", "tcp://localhost:61616?daemon=true");
+        props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+
+        String falconBrokerImplClass = StartupProperties.get().getProperty(
+                "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+
+        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+            DEFAULT_BROKER_MSG_TTL.toString());
+        props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
     }
 
     protected CONFIGURATION getConfig(Properties props) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index fa645a5..d232aaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.oozie;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -79,14 +80,22 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(
         new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
 
-    private final Tag lifecycle;
+    private final LifeCycle lifecycle;
 
-    public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) {
+    public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) {
         super(entity);
         this.lifecycle = lifecycle;
     }
 
-    public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
+    public LifeCycle getLifecycle() {
+        return lifecycle;
+    }
+
+    public Tag getTag() {
+        return lifecycle.getTag();
+    }
+
+    public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
         throws FalconException {
         switch (entity.getEntityType()) {
         case FEED:
@@ -194,11 +203,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     }
 
     protected boolean shouldPreProcess() throws FalconException {
-        if (EntityUtil.getLateProcess(entity) == null || EntityUtil.getLateProcess(entity).getLateInputs() == null
-            || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
-            return false;
-        }
-        return true;
+        return !(EntityUtil.getLateProcess(entity) == null
+                || EntityUtil.getLateProcess(entity).getLateInputs() == null
+                || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0);
     }
 
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index d6115b2..f0864db 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -21,6 +21,7 @@ package org.apache.falcon.oozie.feed;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
@@ -45,6 +46,7 @@ import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.coordinator.ACTION;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -74,10 +76,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB";
 
     public FeedReplicationCoordinatorBuilder(Feed entity) {
-        super(entity, Tag.REPLICATION);
+        super(entity, LifeCycle.REPLICATION);
     }
 
-    @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+    @Override
+    public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         if (feedCluster.getType() == ClusterType.TARGET) {
             List<Properties> props = new ArrayList<Properties>();
@@ -96,6 +99,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         return null;
     }
 
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.REPLICATE;
+    }
+
     private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
 
         // Different workflow for each source since hive credentials vary for each cluster
@@ -180,7 +188,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         }
 
         propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
-        props.putAll(FeedHelper.getUserWorkflowProperties("replication"));
+        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
 
         workflow.setConfiguration(getConfig(props));
         action.setWorkflow(workflow);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index eafef32..288e9de 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie.feed;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -39,7 +40,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
     protected static final String REPLICATION_ACTION_NAME = "replication";
 
     public FeedReplicationWorkflowBuilder(Feed entity) {
-        super(entity, Tag.REPLICATION);
+        super(entity, LifeCycle.REPLICATION);
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
@@ -52,15 +53,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
         addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
 
         marshal(cluster, workflow, buildPath);
-        Properties props = getProperties(buildPath, wfName);
-        props.putAll(getWorkflowProperties());
-        return props;
-    }
-
-    private Properties getWorkflowProperties() {
-        Properties props = new Properties();
-        props.setProperty("falconDataOperation", "REPLICATE");
-        return props;
+        return getProperties(buildPath, wfName);
     }
 
     protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 2a67fd3..3c74485 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie.feed;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
@@ -46,7 +47,7 @@ import java.util.Properties;
  */
 public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
     public FeedRetentionCoordinatorBuilder(Feed entity) {
-        super(entity, Tag.RETENTION);
+        super(entity, LifeCycle.EVICTION);
     }
 
     @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
@@ -85,15 +86,13 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
 
         props.put("limit", feedCluster.getRetention().getLimit().toString());
 
-        props.put(WorkflowExecutionArgs.OPERATION.getName(),
-                WorkflowExecutionContext.EntityOperations.DELETE.name());
         props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
         props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
 
         props.put("falconInputFeeds", entity.getName());
         props.put("falconInPaths", IGNORE);
 
-        props.putAll(FeedHelper.getUserWorkflowProperties("eviction"));
+        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
 
         WORKFLOW workflow = new WORKFLOW();
         Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
@@ -109,4 +108,9 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
         Path marshalPath = marshal(cluster, coord, coordPath);
         return Arrays.asList(getProperties(marshalPath, coordName));
     }
+
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.DELETE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index 3aabb19..cbe055a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie.feed;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -39,7 +40,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
     private static final String EVICTION_ACTION_NAME = "eviction";
 
     public FeedRetentionWorkflowBuilder(Feed entity) {
-        super(entity, Tag.DEFAULT);
+        super(entity, LifeCycle.EVICTION);
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
@@ -75,7 +76,6 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
 
     private Properties getWorkflowProperties() {
         Properties props = new Properties();
-        props.setProperty("falconDataOperation", "DELETE");
         props.setProperty("srcClusterName", "NA");
         return props;
     }
@@ -109,5 +109,4 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index d391032..a33fa62 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.oozie.process;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.EntityUtil;
@@ -50,6 +51,7 @@ import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
@@ -64,7 +66,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
     private static final int THIRTY_MINUTES = 30 * 60 * 1000;
 
     public ProcessExecutionCoordinatorBuilder(Process entity) {
-        super(entity, Tag.DEFAULT);
+        super(entity, LifeCycle.EXECUTION);
     }
 
     @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
@@ -106,6 +108,11 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         return Arrays.asList(getProperties(marshalPath, coordName));
     }
 
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.GENERATE;
+    }
+
     private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {
         coord.setName(coordName);
         org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 2eae7ca..865beaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie.process;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
@@ -58,7 +59,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, USER_ACTION_NAME, }));
 
     protected ProcessExecutionWorkflowBuilder(Process entity) {
-        super(entity, Tag.DEFAULT);
+        super(entity, LifeCycle.EXECUTION);
     }
 
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
@@ -100,14 +101,13 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
 
         marshal(cluster, wfApp, buildPath);
         Properties props = getProperties(buildPath, wfName);
-        props.putAll(getWorkflowProperties(cluster));
+        props.putAll(getWorkflowProperties());
 
         return props;
     }
 
-    private Properties getWorkflowProperties(Cluster cluster) {
+    private Properties getWorkflowProperties() {
         Properties props = new Properties();
-        props.setProperty("falconDataOperation", "GENERATE");
         props.setProperty("srcClusterName", "NA");
         return props;
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index 1631d63..50cc0d4 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -49,15 +49,15 @@
         <arg>-timeStamp</arg>
         <arg>${timeStamp}</arg>
         <arg>-brokerImplClass</arg>
-        <arg>${wf:conf("broker.impl.class")}</arg>
+        <arg>${brokerImplClass}</arg>
         <arg>-brokerUrl</arg>
-        <arg>${wf:conf("broker.url")}</arg>
+        <arg>${brokerUrl}</arg>
         <arg>-userBrokerImplClass</arg>
         <arg>${userBrokerImplClass}</arg>
         <arg>-userBrokerUrl</arg>
         <arg>${userBrokerUrl}</arg>
         <arg>-brokerTTL</arg>
-        <arg>${wf:conf("broker.ttlInMins")}</arg>
+        <arg>${brokerTTL}</arg>
         <arg>-feedNames</arg>
         <arg>${feedNames}</arg>
         <arg>-feedInstancePaths</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 927aba3..e47895f 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -38,7 +38,6 @@ import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.bundle.COORDINATOR;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.process.AbstractTestBase;
@@ -49,6 +48,7 @@ import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -189,10 +189,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
         Assert.assertEquals("${now(0,-40)}", outEventInstance);
 
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
+
+        verifyEntityProperties(feed, trgCluster,
+                WorkflowExecutionContext.EntityOperations.REPLICATE, props);
+        verifyBrokerProperties(trgCluster, props);
 
         // verify the replication param that feed replicator depends on
         String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
@@ -208,7 +209,6 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
-        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, feed));
 
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), feed.getName());
@@ -274,7 +274,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
 
         String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
-        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
+        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster, pathsWithPartitions);
 
         List<Properties> betaCoords = builder.buildCoords(betaTrgCluster, new Path("/beta/falcon/"));
         final COORDINATORAPP betaCoord = getCoordinator(trgMiniDFS,
@@ -283,7 +283,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
 
         pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
-        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
+        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster, pathsWithPartitions);
     }
 
     private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
@@ -302,9 +302,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         return parts;
     }
 
-    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
-                                 String pathsWithPartitions) throws JAXBException, IOException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
+    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster,
+                                 String pathsWithPartitions) throws Exception {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(aFeed, aCluster.getName());
         Date startDate = feedCluster.getValidity().getStart();
         Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
 
@@ -319,10 +320,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         List<String> args = replication.getArg();
         Assert.assertEquals(args.size(), 13);
 
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
 
         Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
         Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
@@ -331,7 +329,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
         Assert.assertEquals(props.get("maxMaps"), "33");
         Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
-        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed));
+
+        verifyEntityProperties(aFeed, aCluster,
+                WorkflowExecutionContext.EntityOperations.REPLICATE, props);
+        verifyBrokerProperties(trgCluster, props);
     }
 
     public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
@@ -415,10 +416,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
         Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
 
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
 
         final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
         final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
@@ -446,14 +444,18 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
-        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
 
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
-        assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), wfPath.toString());
+        assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
+                wfPath.toString());
+
+        verifyEntityProperties(tableFeed, trgCluster,
+                WorkflowExecutionContext.EntityOperations.REPLICATE, props);
+        verifyBrokerProperties(trgCluster, props);
     }
 
     private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
@@ -525,10 +527,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
 
         String feedDataPath = props.get("feedDataPath");
         String storageType = props.get("falconFeedStorageType");
@@ -549,9 +548,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), feed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
-        Assert.assertEquals(props.get("logDir"), getLogPath(srcCluster, feed));
 
         assertWorkflowRetries(coord);
+        verifyEntityProperties(feed, srcCluster,
+                WorkflowExecutionContext.EntityOperations.DELETE, props);
+        verifyBrokerProperties(srcCluster, props);
     }
 
     @Test (dataProvider = "secureOptions")
@@ -571,10 +572,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
 
         String feedDataPath = props.get("feedDataPath");
         String storageType = props.get("falconFeedStorageType");
@@ -595,9 +593,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
-        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
 
         assertWorkflowRetries(coord);
+        verifyBrokerProperties(srcCluster, props);
+        verifyEntityProperties(tableFeed, trgCluster,
+                WorkflowExecutionContext.EntityOperations.DELETE, props);
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
         assertHCatCredentials(
@@ -632,9 +632,4 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
             }
         }
     }
-
-    private String getLogPath(Cluster aCluster, Feed aFeed) {
-        Path logPath = EntityUtil.getLogPath(aCluster, aFeed);
-        return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index a0962fc..b547c31 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.falcon.oozie.process;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -29,9 +30,13 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -48,6 +53,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -199,4 +205,49 @@ public class AbstractTestBase {
             Assert.assertEquals(action.getRetryInterval(), "1");
         }
     }
+
+    protected HashMap<String, String> getCoordProperties(COORDINATORAPP coord) {
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (CONFIGURATION.Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        return props;
+    }
+
+    protected void verifyEntityProperties(Entity entity, Cluster cluster,
+                                          WorkflowExecutionContext.EntityOperations operation,
+                                          HashMap<String, String> props) throws Exception {
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
+                entity.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
+                entity.getEntityType().name());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
+        Assert.assertEquals(props.get("falconDataOperation"), operation.name());
+    }
+
+    private String getLogPath(Cluster cluster, Entity entity) {
+        Path logPath = EntityUtil.getLogPath(cluster, entity);
+        return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+    }
+
+    protected void verifyBrokerProperties(Cluster cluster, HashMap<String, String> props) {
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_URL.getName()),
+                ClusterHelper.getMessageBrokerUrl(cluster));
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName()),
+                ClusterHelper.getMessageBrokerImplClass(cluster));
+
+        String falconBrokerUrl = StartupProperties.get().getProperty(
+                "broker.url", "tcp://localhost:61616?daemon=true");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_URL.getName()), falconBrokerUrl);
+
+        String falconBrokerImplClass = StartupProperties.get().getProperty(
+                "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName()),
+                falconBrokerImplClass);
+
+        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+                String.valueOf(3 * 24 * 60L));
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_TTL.getName()), jmsMessageTTL);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 45a8732..45badee 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -44,7 +44,6 @@ import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.bundle.CONFIGURATION;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.workflow.ACTION;
@@ -55,6 +54,7 @@ import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -186,19 +186,14 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         assertEquals(ds.getUriTemplate(),
                 FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
 
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
         assertEquals(props.get("mapred.job.priority"), "LOW");
-        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
-        assertLibExtensions(fs, coord, EntityType.PROCESS, null);
-    }
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
 
-    private String getLogPath(Process process) {
-        Path logPath = EntityUtil.getLogPath(cluster, process);
-        return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+        assertLibExtensions(fs, coord, EntityType.PROCESS, null);
     }
 
     @Test
@@ -285,10 +280,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
+
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
 
         // verify table and hive props
         Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
@@ -298,7 +294,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
             }
         }
-        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -349,11 +344,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+        HashMap<String, String> props = getCoordProperties(coord);
+
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -404,11 +399,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+        HashMap<String, String> props = getCoordProperties(coord);
+
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -455,11 +450,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+        HashMap<String, String> props = getCoordProperties(coord);
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -546,10 +540,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
+        HashMap<String, String> props = getCoordProperties(coord);
 
         // verify table props
         Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
@@ -558,7 +549,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
             }
         }
-        Assert.assertEquals(props.get("logDir"), getLogPath(process));
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
 
         // verify the late data params
         Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
@@ -684,11 +677,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-        Assert.assertEquals(props.get("logDir"), getLogPath(processEntity));
+        HashMap<String, String> props = getCoordProperties(coord);
+        verifyEntityProperties(processEntity, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
 
         String[] expected = {
             WorkflowExecutionArgs.FEED_NAMES.getName(),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index feddfdd..91559a5 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -150,8 +150,8 @@ public class FalconPostProcessingTest {
         if (workflowUser != null) { // in case of user message, its NULL
             Assert.assertEquals(workflowUser, "falcon");
         }
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01T01:00Z");
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01T01:00Z");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01-01-00");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01-01-00");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 76f0ac0..24c6ec2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -37,6 +37,7 @@ import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.RerunPolicyFactory;
 import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +66,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             if (wait == -1) {
                 LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
 
+                CurrentUser.authenticate(workflowUser);
                 java.util.Properties properties =
                         this.getWfEngine().getWorkflowProperties(cluster, wfId);
                 String logDir = properties.getProperty("logDir");