You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/08 13:36:25 UTC

falcon git commit: FALCON-1231 Improve JobCompletionNotification Service

Repository: falcon
Updated Branches:
  refs/heads/master d08f8bd74 -> f4bd1e7e5


FALCON-1231 Improve JobCompletionNotification Service


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f4bd1e7e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f4bd1e7e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f4bd1e7e

Branch: refs/heads/master
Commit: f4bd1e7e53ca4f113b78d3a5cf4e55aba99fbe93
Parents: d08f8bd
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Thu Oct 8 16:41:59 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Oct 8 16:41:59 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/falcon/resource/InstancesResult.java |   7 +
 .../falcon/catalog/CatalogPartitionHandler.java |  15 ++
 .../falcon/entity/WorkflowNameBuilder.java      |  69 ++++--
 .../falcon/metadata/MetadataMappingService.java |  14 ++
 .../falcon/workflow/WorkflowExecutionArgs.java  |   3 +
 .../workflow/WorkflowExecutionContext.java      |  44 +++-
 .../workflow/WorkflowExecutionListener.java     |  31 +++
 .../WorkflowJobEndNotificationService.java      | 217 ++++++++++++++++---
 .../workflow/engine/AbstractWorkflowEngine.java |   2 +
 .../falcon/entity/TestWorkflowNameBuilder.java  |  18 ++
 .../workflow/WorkflowExecutionContextTest.java  |  17 ++
 .../WorkflowJobEndNotificationServiceTest.java  |  59 +++--
 messaging/pom.xml                               |   7 +
 .../falcon/messaging/JMSMessageConsumer.java    | 155 +++++++++++--
 .../messaging/JMSMessageConsumerTest.java       | 187 +++++++++++++---
 .../src/main/conf/oozie-site.xml                |  49 ++++-
 .../falcon/workflow/FalconPostProcessing.java   |  10 -
 .../workflow/engine/OozieWorkflowEngine.java    |  24 ++
 .../workflow/FalconPostProcessingTest.java      |   3 -
 .../falcon/rerun/handler/LateRerunHandler.java  |  15 ++
 .../falcon/rerun/handler/RetryHandler.java      |  19 ++
 webapp/src/conf/oozie/conf/oozie-site.xml       |  48 ++++
 23 files changed, 877 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02a65a1..c845062 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1231 Improve JobCompletionNotification Service(Pallavi Rao)
+
     FALCON-1157 Build error when using maven 3.3.x(Venkat Ramachandran via Pallavi Rao)
 
     FALCON-1477 Adding "-debug" option to Falcon CLI for debug statements to stdout(Narayan Periwal via Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index 76bb4b0..e05eeeb 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -109,6 +109,9 @@ public class InstancesResult extends APIResult {
         public Date endTime;
 
         @XmlElement
+        public int runId;
+
+        @XmlElement
         public String details;
 
         @XmlElement
@@ -154,6 +157,10 @@ public class InstancesResult extends APIResult {
             return endTime;
         }
 
+        public int getRunId() {
+            return runId;
+        }
+
         public InstanceAction[] getActions() {
             return actions;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
index d0b09df..cccb4f8 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
@@ -295,4 +295,19 @@ public class CatalogPartitionHandler implements WorkflowExecutionListener{
     public void onFailure(WorkflowExecutionContext context) throws FalconException {
         //no-op
     }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
index 6890594..c58be64 100644
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 import org.apache.falcon.Pair;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -56,13 +57,13 @@ public class WorkflowNameBuilder<T extends Entity> {
     }
 
     public Tag getWorkflowTag(String workflowName) {
-        return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? null
-                : WorkflowName.getTagAndSuffixes(entity, workflowName).first;
+        return WorkflowName.getTagAndSuffixes(workflowName) == null ? null
+                : WorkflowName.getTagAndSuffixes(workflowName).first;
     }
 
     public String getWorkflowSuffixes(String workflowName) {
-        return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? ""
-                : WorkflowName.getTagAndSuffixes(entity, workflowName).second;
+        return WorkflowName.getTagAndSuffixes(workflowName) == null ? ""
+                : WorkflowName.getTagAndSuffixes(workflowName).second;
     }
 
     /**
@@ -70,6 +71,7 @@ public class WorkflowNameBuilder<T extends Entity> {
      */
     public static class WorkflowName {
         private static final String SEPARATOR = "_";
+        private static final Pattern WF_NAME_PATTERN;
 
         private String prefix;
         private String entityType;
@@ -77,6 +79,32 @@ public class WorkflowNameBuilder<T extends Entity> {
         private String entityName;
         private List<String> suffixes;
 
+        static {
+            StringBuilder typePattern = new StringBuilder("(");
+            for (EntityType type : EntityType.values()) {
+                typePattern.append(type.name());
+                typePattern.append("|");
+            }
+            typePattern = typePattern.deleteCharAt(typePattern.length() - 1);
+            typePattern.append(")");
+            StringBuilder tagsPattern = new StringBuilder("(");
+            for (Tag tag : Tag.values()) {
+                tagsPattern.append(tag.name());
+                tagsPattern.append("|");
+            }
+            tagsPattern = tagsPattern.deleteCharAt(tagsPattern.length() - 1);
+            tagsPattern.append(")");
+
+            String name = "([a-zA-Z][\\-a-zA-Z0-9]*)";
+
+            String suffix = "([_A-Za-z0-9-.]*)";
+
+            String namePattern = PREFIX + SEPARATOR + typePattern + SEPARATOR + tagsPattern
+                    + SEPARATOR + name + suffix;
+
+            WF_NAME_PATTERN = Pattern.compile(namePattern);
+        }
+
         public WorkflowName(String prefix, String entityType, String tag,
                             String entityName, List<String> suffixes) {
             this.prefix = prefix;
@@ -100,28 +128,27 @@ public class WorkflowNameBuilder<T extends Entity> {
             return builder.toString();
         }
 
-        public static Pair<Tag, String> getTagAndSuffixes(Entity entity,
-                                                          String workflowName) {
-
-            StringBuilder namePattern = new StringBuilder(PREFIX + SEPARATOR
-                    + entity.getEntityType().name() + SEPARATOR + "(");
-            for (Tag tag : Tag.values()) {
-                namePattern.append(tag.name());
-                namePattern.append("|");
+        public static Pair<Tag, String> getTagAndSuffixes(String workflowName) {
+            Matcher matcher = WF_NAME_PATTERN.matcher(workflowName);
+            if (matcher.matches()) {
+                matcher.reset();
+                if (matcher.find()) {
+                    String tag = matcher.group(2);
+                    String suffixes = matcher.group(4);
+                    return new Pair<>(Tag.valueOf(tag), suffixes);
+                }
             }
-            namePattern = namePattern.deleteCharAt(namePattern.length() - 1);
-            namePattern.append(")" + SEPARATOR + entity.getName()
-                    + "([_A-Za-z0-9-.]*)");
-
-            Pattern pattern = Pattern.compile(namePattern.toString());
+            return null;
+        }
 
-            Matcher matcher = pattern.matcher(workflowName);
+        public static Pair<String, EntityType> getEntityNameAndType(String workflowName) {
+            Matcher matcher = WF_NAME_PATTERN.matcher(workflowName);
             if (matcher.matches()) {
                 matcher.reset();
                 if (matcher.find()) {
-                    String tag = matcher.group(1);
-                    String suffixes = matcher.group(2);
-                    return new Pair<Tag, String>(Tag.valueOf(tag), suffixes);
+                    String type = matcher.group(1);
+                    String name = matcher.group(3);
+                    return new Pair<>(name, EntityType.valueOf(type));
                 }
             }
             return null;

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index ef9da45..56fbde0 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -297,6 +297,20 @@ public class MetadataMappingService
         // do nothing since lineage is only recorded for successful workflow
     }
 
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+        // TBD
+    }
 
 
     private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 9456fb9..d2430a2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -35,6 +35,9 @@ public enum WorkflowExecutionArgs {
     // where
     CLUSTER_NAME("cluster", "name of the current cluster"),
     OPERATION("operation", "operation like generate, delete, replicate"),
+    // Exactly same as the above. Introduced to ensure compatibility between messages produced by POST-PROCESSING and
+    // the values in conf.
+    DATA_OPERATION("falconDataOperation", "operation like generate, delete, replicate", false),
 
     // who
     WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 4454239..45b6d23 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -62,12 +62,12 @@ public class WorkflowExecutionContext {
     /**
      * Workflow execution status.
      */
-    public enum Status {SUCCEEDED, FAILED}
+    public enum Status {WAITING, RUNNING, SUSPENDED, SUCCEEDED, FAILED, TIMEDOUT, KILLED}
 
     /**
      * Workflow execution type.
      */
-    public enum Type {PRE_PROCESSING, POST_PROCESSING}
+    public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACTION}
 
     /**
      * Entity operations supported.
@@ -107,6 +107,10 @@ public class WorkflowExecutionContext {
         return context.get(arg);
     }
 
+    public void setValue(WorkflowExecutionArgs arg, String value) {
+        context.put(arg, value);
+    }
+
     public String getValue(WorkflowExecutionArgs arg, String defaultValue) {
         return context.containsKey(arg) ? context.get(arg) : defaultValue;
     }
@@ -128,10 +132,22 @@ public class WorkflowExecutionContext {
         return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
     }
 
+    public boolean hasWorkflowTimedOut() {
+        return Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS));
+    }
+
+    public boolean hasWorkflowBeenKilled() {
+        return Status.KILLED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
+    }
+
     public String getContextFile() {
         return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
     }
 
+    public Status getWorkflowStatus() {
+        return Status.valueOf(getValue(WorkflowExecutionArgs.STATUS));
+    }
+
     public String getLogDir() {
         return getValue(WorkflowExecutionArgs.LOG_DIR);
     }
@@ -211,7 +227,10 @@ public class WorkflowExecutionContext {
     }
 
     public EntityOperations getOperation() {
-        return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION));
+        if (getValue(WorkflowExecutionArgs.OPERATION) != null) {
+            return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION));
+        }
+        return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.DATA_OPERATION));
     }
 
     public String getOutputFeedNames() {
@@ -282,6 +301,19 @@ public class WorkflowExecutionContext {
         return creationTime;
     }
 
+    public long getWorkflowStartTime() {
+        return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME));
+    }
+
+    public long getWorkflowEndTime() {
+        return Long.parseLong(getValue(WorkflowExecutionArgs.WF_END_TIME));
+    }
+
+
+    public Type getContextType() {
+        return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE));
+    }
+
     /**
      * this method is invoked from with in the workflow.
      *
@@ -397,7 +429,11 @@ public class WorkflowExecutionContext {
     }
 
     public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties) {
-        wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, Type.POST_PROCESSING.name());
+        return WorkflowExecutionContext.create(wfProperties, Type.POST_PROCESSING);
+    }
+
+    public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties, Type type) {
+        wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
         return new WorkflowExecutionContext(wfProperties);
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
index 2d3a477..7bf14f2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
@@ -25,7 +25,38 @@ import org.apache.falcon.FalconException;
  */
 public interface WorkflowExecutionListener {
 
+    /**
+     * Invoked when a workflow is succeeds.
+     * @param context
+     * @throws FalconException
+     */
     void onSuccess(WorkflowExecutionContext context) throws FalconException;
 
+    /**
+     * Invoked when a workflow fails.
+     * @param context
+     * @throws FalconException
+     */
     void onFailure(WorkflowExecutionContext context) throws FalconException;
+
+    /**
+     * Invoked on start of a workflow. Basically, when the workflow is RUNNING.
+     * @param context
+     * @throws FalconException
+     */
+    void onStart(WorkflowExecutionContext context) throws FalconException;
+
+    /**
+     * Invoked when a workflow is suspended.
+     * @param context
+     * @throws FalconException
+     */
+    void onSuspend(WorkflowExecutionContext context) throws FalconException;
+
+    /**
+     * Invoked when a workflow is in waiting state.
+     * @param context
+     * @throws FalconException
+     */
+    void onWait(WorkflowExecutionContext context) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index c4f8843..5c75f5c 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -21,19 +21,24 @@ package org.apache.falcon.workflow;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.FalconService;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * A workflow job end notification service.
@@ -46,11 +51,21 @@ public class WorkflowJobEndNotificationService implements FalconService {
 
     private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
 
+    // Maintain a cache of context built, so we don't have to query Oozie for every state change.
+    private Map<String, Properties> contextMap = new ConcurrentHashMap<>();
+
+    private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
+
     @Override
     public String getName() {
         return SERVICE_NAME;
     }
 
+    // Mainly for test
+    Map<String, Properties> getContextMap() {
+        return contextMap;
+    }
+
     @Override
     public void init() throws FalconException {
         String listenerClassNames = StartupProperties.get().getProperty(
@@ -83,9 +98,34 @@ public class WorkflowJobEndNotificationService implements FalconService {
     }
 
     public void notifyFailure(WorkflowExecutionContext context) {
+        notifyWorkflowEnd(context);
+    }
+
+    public void notifySuccess(WorkflowExecutionContext context) {
+        notifyWorkflowEnd(context);
+    }
+
+    public void notifyStart(WorkflowExecutionContext context) {
+        // Start notifications can only be from Oozie JMS notifications
+        updateContextFromWFConf(context);
+        LOG.debug("Sending workflow start notification to listeners with context : {} ", context);
+        for (WorkflowExecutionListener listener : listeners) {
+            try {
+                listener.onStart(context);
+            } catch (Throwable t) {
+                // do not rethrow as other listeners do not get a chance
+                LOG.error("Error in listener {}", listener.getClass().getName(), t);
+            }
+        }
+    }
+
+    public void notifySuspend(WorkflowExecutionContext context) {
+        // Suspend notifications can only be from Oozie JMS notifications
+        updateContextFromWFConf(context);
+        LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {
-                listener.onFailure(context);
+                listener.onSuspend(context);
             } catch (Throwable t) {
                 // do not rethrow as other listeners do not get a chance
                 LOG.error("Error in listener {}", listener.getClass().getName(), t);
@@ -93,19 +133,142 @@ public class WorkflowJobEndNotificationService implements FalconService {
         }
 
         instrumentAlert(context);
+        contextMap.remove(context.getWorkflowId());
     }
 
-    public void notifySuccess(WorkflowExecutionContext context) {
+    public void notifyWait(WorkflowExecutionContext context) {
+        // Wait notifications can only be from Oozie JMS notifications
+        updateContextFromWFConf(context);
+        LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {
-                listener.onSuccess(context);
+                listener.onWait(context);
             } catch (Throwable t) {
                 // do not rethrow as other listeners do not get a chance
                 LOG.error("Error in listener {}", listener.getClass().getName(), t);
             }
         }
+    }
 
-        instrumentAlert(context);
+    // The method retrieves the conf from the cache if it is in cache.
+    // Else, queries WF Engine to retrieve the conf of the workflow
+    private void updateContextFromWFConf(WorkflowExecutionContext context) {
+        try {
+            Properties wfProps = contextMap.get(context.getWorkflowId());
+            if (wfProps == null) {
+                Entity entity = CONFIG_STORE.get(EntityType.valueOf(context.getEntityType()), context.getEntityName());
+                // Entity can be null in case of delete. Engine will generate notifications for instance kills.
+                // But, the entity would no longer be in the config store.
+                if (entity == null) {
+                    return;
+                }
+                for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+                    try {
+                        InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine()
+                                .getJobDetails(cluster, context.getWorkflowId()).getInstances();
+                        if (instances != null && instances.length > 0) {
+                            wfProps = getWFProps(instances[0].getWfParams());
+                            // Required by RetryService. But, is not part of conf.
+                            wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
+                                    Integer.toString(instances[0].getRunId()));
+                        }
+                    } catch (FalconException e) {
+                        // Do Nothing. The workflow may not have been deployed on this cluster.
+                        continue;
+                    }
+                    contextMap.put(context.getWorkflowId(), wfProps);
+                }
+            }
+
+            // No extra props to enhance the context with.
+            if (wfProps == null || wfProps.isEmpty()) {
+                return;
+            }
+
+            for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+                if (wfProps.containsKey(arg.getName())) {
+                    context.setValue(arg, wfProps.getProperty(arg.getName()));
+                }
+            }
+
+        } catch (FalconException e) {
+            LOG.error("Unable to retrieve entity {} of type {} from config store.", e);
+        }
+    }
+
+    private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
+        Properties props = new Properties();
+        for (InstancesResult.KeyValuePair kv : wfParams) {
+            props.put(kv.getKey(), kv.getValue());
+        }
+        return props;
+    }
+
+
+    // This method handles both success and failure notifications.
+    private void notifyWorkflowEnd(WorkflowExecutionContext context) {
+        // Need to distinguish notification from post processing for backward compatibility
+        if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
+            boolean engineNotifEnabled = false;
+            try {
+                engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine()
+                        .isNotificationEnabled(context.getClusterName(), context.getWorkflowId());
+            } catch (FalconException e) {
+                LOG.debug("Unable to determine if the notification is enabled on the wf engine. Assuming not.", e);
+            }
+            // Ignore the message from post processing as there will be one more from Oozie.
+            if (engineNotifEnabled) {
+                LOG.info("Ignoring message from post processing as engine notification is enabled.");
+                return;
+            } else {
+                updateContextWithTime(context);
+            }
+        } else {
+            updateContextFromWFConf(context);
+        }
+
+        LOG.debug("Sending workflow end notification to listeners with context : {} ", context);
+
+        for (WorkflowExecutionListener listener : listeners) {
+            try {
+                if (context.hasWorkflowSucceeded()) {
+                    listener.onSuccess(context);
+                    instrumentAlert(context);
+                } else {
+                    listener.onFailure(context);
+                    if (context.hasWorkflowBeenKilled() || context.hasWorkflowFailed()) {
+                        instrumentAlert(context);
+                    }
+                }
+            } catch (Throwable t) {
+                // do not rethrow as other listeners do not get a chance
+                LOG.error("Error in listener {}", listener.getClass().getName(), t);
+            }
+        }
+
+        contextMap.remove(context.getWorkflowId());
+    }
+
+    // In case of notifications coming from post notifications, start and end time need to be populated.
+    private void updateContextWithTime(WorkflowExecutionContext context) {
+        try {
+            InstancesResult result = WorkflowEngineFactory.getWorkflowEngine()
+                    .getJobDetails(context.getClusterName(), context.getWorkflowId());
+            Date startTime = result.getInstances()[0].startTime;
+            Date endTime = result.getInstances()[0].endTime;
+            Date now = new Date();
+            if (startTime == null) {
+                startTime = now;
+            }
+            if (endTime == null) {
+                endTime = now;
+            }
+            context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime()));
+            context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime()));
+        } catch(FalconException e) {
+            LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster "
+                    + context.getClusterName(), e);
+        }
     }
 
     private void instrumentAlert(WorkflowExecutionContext context) {
@@ -117,27 +280,31 @@ public class WorkflowJobEndNotificationService implements FalconService {
         String workflowUser = context.getWorkflowUser();
         String nominalTime = context.getNominalTimeAsISO8601();
         String runId = String.valueOf(context.getWorkflowRunId());
+        Date now = new Date();
+        // Start and/or End time may not be set in case of workflow suspend
+        Date endTime;
+        if (context.getWorkflowEndTime() == 0) {
+            endTime = now;
+        } else {
+            endTime = new Date(context.getWorkflowEndTime());
+        }
 
-        try {
-            CurrentUser.authenticate(context.getWorkflowUser());
-            AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-            InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
-            Date startTime = result.getInstances()[0].startTime;
-            Date endTime = result.getInstances()[0].endTime;
-            Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+        Date startTime;
+        if (context.getWorkflowStartTime() == 0) {
+            startTime = now;
+        } else {
+            startTime = new Date(context.getWorkflowStartTime());
+        }
+        Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
 
-            if (context.hasWorkflowFailed()) {
-                GenericAlert.instrumentFailedInstance(clusterName, entityType,
-                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
-                        SchemaHelper.formatDateUTC(startTime), "", "", duration);
-            } else {
-                GenericAlert.instrumentSucceededInstance(clusterName, entityType,
-                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
-                        SchemaHelper.formatDateUTC(startTime), duration);
-            }
-        } catch (FalconException e) {
-            // Logging an error and ignoring since there are listeners for extensions
-            LOG.error("Instrumenting alert failed for: " + context, e);
+        if (context.hasWorkflowFailed()) {
+            GenericAlert.instrumentFailedInstance(clusterName, entityType,
+                    entityName, nominalTime, workflowId, workflowUser, runId, operation,
+                    SchemaHelper.formatDateUTC(startTime), "", "", duration);
+        } else {
+            GenericAlert.instrumentSucceededInstance(clusterName, entityType,
+                    entityName, nominalTime, workflowId, workflowUser, runId, operation,
+                    SchemaHelper.formatDateUTC(startTime), duration);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 0b560bb..8b3460a 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -105,4 +105,6 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end,
                                                       List<LifeCycle> lifeCycles) throws FalconException;
+
+    public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java b/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
index 6060731..5b1af78 100644
--- a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
+++ b/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
@@ -17,10 +17,13 @@
  */
 package org.apache.falcon.entity;
 
+import org.apache.falcon.Pair;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -88,4 +91,19 @@ public class TestWorkflowNameBuilder {
                 "FALCON_PROCESS_DEFAULT_agg-logs");
 
     }
+
+    @Test(dataProvider = "workflowNames")
+    public void workflowNameTypeTest(String wfName, Pair<String, EntityType> nameType) {
+        Assert.assertEquals(WorkflowNameBuilder.WorkflowName.getEntityNameAndType(wfName), nameType);
+    }
+
+    @DataProvider(name = "workflowNames")
+    public Object[][] getWorkflowNames() {
+        return new Object[][] {
+            {"FALCON_PROCESS_DEFAULT_agg-logs", new Pair<>("agg-logs", EntityType.PROCESS)},
+            {"FALCON_FEED_REPLICATION_raw-logs", new Pair<>("raw-logs", EntityType.FEED)},
+            {"FALCON_FEED_RETENTION_logs2", new Pair<>("logs2", EntityType.FEED)},
+            {"FALCON_FEED_REPLICATION_logs_colo1", new Pair<>("logs", EntityType.FEED)},
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index 65a057d..4fdc1e9 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -24,6 +24,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.Date;
+
 
 /**
  * A test for WorkflowExecutionContext.
@@ -86,6 +88,7 @@ public class WorkflowExecutionContextTest {
     @Test
     public void testHasWorkflowSucceeded() throws Exception {
         Assert.assertTrue(context.hasWorkflowSucceeded());
+        Assert.assertEquals(context.getWorkflowStatus(), WorkflowExecutionContext.Status.SUCCEEDED);
     }
 
     @Test
@@ -240,6 +243,18 @@ public class WorkflowExecutionContextTest {
     }
 
     @Test
+    public void testWorkflowStartEnd() throws Exception {
+        Assert.assertEquals(context.getWorkflowEndTime() - context.getWorkflowStartTime(), 1000000);
+    }
+
+    @Test
+    public void testSetAndGetValue() throws Exception {
+        context.setValue(WorkflowExecutionArgs.RUN_ID, "10");
+        Assert.assertEquals(context.getValue(WorkflowExecutionArgs.RUN_ID), "10");
+        context.setValue(WorkflowExecutionArgs.RUN_ID, "1");
+    }
+
+    @Test
     public void testSerializeDeserialize() throws Exception {
         String contextFile = context.getContextFile();
         context.serialize();
@@ -318,6 +333,8 @@ public class WorkflowExecutionContextTest {
 
             "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
             "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+            "-" + WorkflowExecutionArgs.WF_START_TIME.getName(), Long.toString(new Date().getTime()),
+            "-" + WorkflowExecutionArgs.WF_END_TIME.getName(), Long.toString(new Date().getTime() + 1000000),
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index b7df443..1a9597b 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -22,10 +22,13 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.util.StartupProperties;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import java.util.Date;
+import java.util.Properties;
+
 /**
  * A test for WorkflowJobEndNotificationService.
  */
@@ -55,15 +58,17 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
     private WorkflowJobEndNotificationService service;
     private WorkflowExecutionContext savedContext;
 
-    @BeforeMethod
+    @BeforeClass
     public void setUp() throws Exception {
         service = new WorkflowJobEndNotificationService();
         savedContext = WorkflowExecutionContext.create(getTestMessageArgs(),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         Assert.assertNotNull(savedContext);
+        service.init();
+        service.registerListener(this);
     }
 
-    @AfterMethod
+    @AfterClass
     public void tearDown() throws Exception {
         service.destroy();
     }
@@ -73,29 +78,30 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
         Assert.assertEquals(service.getName(), WorkflowJobEndNotificationService.SERVICE_NAME);
     }
 
-    @Test
-    public void testInit() throws Exception {
-        String listenerClassNames = StartupProperties.get().getProperty(
-                "workflow.execution.listeners");
-        Assert.assertEquals(listenerClassNames, "");
-
+    @Test(priority = -1)
+    public void testBasic() throws Exception {
         try {
-            StartupProperties.get().setProperty("workflow.execution.listeners",
-                    "org.apache.falcon.workflow.WorkflowJobEndNotificationServiceTest");
-            listenerClassNames = StartupProperties.get().getProperty(
-                    "workflow.execution.listeners");
-            Assert.assertEquals(listenerClassNames,
-                    "org.apache.falcon.workflow.WorkflowJobEndNotificationServiceTest");
-
-            service.init();
             notifyFailure(savedContext);
             notifySuccess(savedContext);
         } finally {
-            service.unregisterListener(this);
             StartupProperties.get().setProperty("workflow.execution.listeners", "");
         }
     }
 
+    @Test
+    public void testNotificationsFromEngine() throws FalconException {
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+                WorkflowExecutionContext.Type.WORKFLOW_JOB);
+
+        // Pretend the start was already notified
+        Properties wfProps = new Properties();
+        wfProps.put(WorkflowExecutionArgs.CLUSTER_NAME.name(), CLUSTER_NAME);
+        service.getContextMap().put("workflow-01-00", wfProps);
+
+        // Should retrieve from cache.
+        service.notifySuspend(context);
+    }
+
     @Override
     public void onSuccess(WorkflowExecutionContext context) throws FalconException {
         Assert.assertNotNull(context);
@@ -108,6 +114,19 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
         Assert.assertEquals(context.entrySet().size(), 28);
     }
 
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+
+    }
+
     private void notifyFailure(WorkflowExecutionContext context) {
         service.notifyFailure(context);
     }
@@ -150,6 +169,8 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
 
             "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
             "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+            "-" + WorkflowExecutionArgs.WF_START_TIME.getName(), Long.toString(new Date().getTime()),
+            "-" + WorkflowExecutionArgs.WF_END_TIME.getName(), Long.toString(new Date().getTime() + 1000000),
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index 918de63..e64a3f8 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -176,5 +176,12 @@
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index d3178fb..bbb5d9b 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -20,12 +20,17 @@ package org.apache.falcon.messaging;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.WorkflowNameBuilder;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.messaging.util.MessagingUtil;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.json.JSONException;
+import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,12 +42,17 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import java.lang.reflect.InvocationTargetException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TimeZone;
 
 /**
  * Subscribes to the falcon topic for handling retries and alerts.
@@ -94,22 +104,22 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
 
     @Override
     public void onMessage(Message message) {
-        MapMessage mapMessage = (MapMessage) message;
         LOG.info("Received JMS message {}", message.toString());
-
         try {
-            WorkflowExecutionContext context = createContext(mapMessage);
-            LOG.info("Created context from JMS message {}", context);
-
-            // Login the user so listeners can access FS and WfEngine as this user
-            CurrentUser.authenticate(context.getWorkflowUser());
-
-            if (context.hasWorkflowFailed()) {
-                onFailure(context);
-            } else if (context.hasWorkflowSucceeded()) {
-                onSuccess(context);
+            if (message instanceof MapMessage) {
+                MapMessage mapMessage = (MapMessage) message;
+                WorkflowExecutionContext context = createContext(mapMessage);
+                LOG.info("Created context from Falcon JMS message {}", context);
+                invokeListener(context);
+            // Due to backward compatibility, need to handle messages from post processing too.
+            // Hence cannot use JMS selectors.
+            } else if (shouldHandle(message)) {
+                TextMessage textMessage = (TextMessage) message;
+                WorkflowExecutionContext context = createContext(textMessage);
+                LOG.info("Created context from Oozie JMS message {}", context);
+                invokeListener(context);
             }
-        } catch (JMSException e) {
+        } catch (Exception e) {
             String errorMessage = "Error in onMessage for topicSubscriber of topic: "
                     + topicName + ", Message: " + message.toString();
             LOG.info(errorMessage, e);
@@ -117,6 +127,117 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
         }
     }
 
+    // Creates context from the JMS notification of the workflow engine
+    private WorkflowExecutionContext createContext(TextMessage message) throws JMSException, FalconException {
+        try {
+            // Example Workflow Job in FAILED state:
+            // {"status":"FAILED","errorCode":"EL_ERROR","errorMessage":"variable [dummyvalue] cannot be resolved",
+            //        "id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000,"endTime":1366672183543}
+            JSONObject json = new JSONObject(message.getText());
+            long currentTime = System.currentTimeMillis();
+            Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<>();
+            wfProperties.put(WorkflowExecutionArgs.STATUS, json.getString("status"));
+            wfProperties.put(WorkflowExecutionArgs.WORKFLOW_ID, json.getString("id"));
+            wfProperties.put(WorkflowExecutionArgs.WF_START_TIME, json.isNull("startTime")? Long.toString(currentTime)
+                    : json.getString("startTime"));
+            wfProperties.put(WorkflowExecutionArgs.WF_END_TIME, json.isNull("endTime")? Long.toString(currentTime)
+                    : json.getString("endTime"));
+            if (!json.isNull("nominalTime")) {
+                wfProperties.put(WorkflowExecutionArgs.NOMINAL_TIME,
+                        getNominalTimeString(Long.parseLong(json.getString("nominalTime"))));
+            }
+            Pair<String, EntityType> entityTypePair = WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
+                    message.getStringProperty("appName"));
+            wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first);
+            wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name());
+            wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user"));
+
+            String appType = message.getStringProperty("appType");
+            return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType));
+
+        } catch (JSONException e) {
+            throw new FalconException("Unable to build a context from the JMS message.", e);
+        }
+    }
+
+    private String getNominalTimeString(long timeInMillis) {
+        Date time = new Date(timeInMillis);
+        final String format = "yyyy-MM-dd-HH-mm";
+        DateFormat dateFormat = new SimpleDateFormat(format);
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        return dateFormat.format(time);
+    }
+
+    private void invokeListener(WorkflowExecutionContext context) {
+        // Login the user so listeners can access FS and WfEngine as this user
+        CurrentUser.authenticate(context.getWorkflowUser());
+
+        WorkflowExecutionContext.Status status = WorkflowExecutionContext.Status.valueOf(
+                context.getValue(WorkflowExecutionArgs.STATUS));
+
+        // Handle only timeout and wait notifications of coord
+        if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
+            switch(status) {
+            case TIMEDOUT:
+                jobEndNotificationService.notifyFailure(context);
+                break;
+            case WAITING:
+                jobEndNotificationService.notifyWait(context);
+                break;
+            default:
+                break;
+            }
+        } else {
+            switch(status) {
+            case KILLED:
+            case FAILED:
+                jobEndNotificationService.notifyFailure(context);
+                break;
+            case SUCCEEDED:
+                jobEndNotificationService.notifySuccess(context);
+                break;
+            case SUSPENDED:
+                jobEndNotificationService.notifySuspend(context);
+                break;
+            case RUNNING:
+                jobEndNotificationService.notifyStart(context);
+                break;
+            default :
+                throw new IllegalArgumentException("Not valid Status of workflow");
+            }
+        }
+    }
+
+    // Since Oozie has a system level JMS connection info, Falcon should ensure it is handling notifications
+    // of Falcon entities only.
+    private boolean shouldHandle(Message message) {
+        try {
+            String appType = message.getStringProperty("appType");
+            // Handle all workflow job notifications for falcon workflows
+            if (appType != null
+                    && WorkflowExecutionContext.Type.WORKFLOW_JOB == WorkflowExecutionContext.Type.valueOf(appType)
+                    && WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
+                        message.getStringProperty("appName")) != null) {
+                return true;
+            }
+
+            // Handle coord notification for falcon workflows only for WAITING and TIMED_OUT.
+            if (appType != null
+                    && WorkflowExecutionContext.Type.COORDINATOR_ACTION
+                        == WorkflowExecutionContext.Type.valueOf(appType)
+                    && WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
+                        message.getStringProperty("appName")) != null) {
+                String status = message.getStringProperty("eventStatus");
+                if (status != null && ("WAITING".equals(status) || "FAILURE".equals(status))) {
+                    return true;
+                }
+            }
+        } catch (JMSException e) {
+            LOG.error("Error while parsing the message header", e);
+        }
+        return false;
+    }
+
     private WorkflowExecutionContext createContext(MapMessage mapMessage) throws JMSException {
         // for backwards compatibility, read all args from message
         Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
@@ -130,14 +251,6 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
         return WorkflowExecutionContext.create(wfProperties);
     }
 
-    public void onFailure(WorkflowExecutionContext context) {
-        jobEndNotificationService.notifyFailure(context);
-    }
-
-    public void onSuccess(WorkflowExecutionContext context) {
-        jobEndNotificationService.notifySuccess(context);
-    }
-
     @Override
     public void onException(JMSException ignore) {
         String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName;

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 7356ee3..dee7c47 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -26,10 +26,11 @@ import org.apache.falcon.util.FalconTestUtil;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.mockito.Mockito;
 import org.mortbay.log.Log;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.jms.Connection;
@@ -37,7 +38,9 @@ import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
+import javax.jms.Message;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import java.io.IOException;
 import java.util.HashMap;
@@ -54,17 +57,26 @@ public class JMSMessageConsumerTest {
     private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
     private BrokerService broker;
 
-    @BeforeClass
+    private JMSMessageConsumer subscriber;
+    private WorkflowJobEndNotificationService jobEndService;
+
+    @BeforeMethod
     public void setup() throws Exception {
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
         broker.setDataDirectory("target/activemq");
         broker.setBrokerName("localhost");
+        jobEndService = Mockito.mock(WorkflowJobEndNotificationService.class);
         broker.start();
-        broker.deleteAllMessages();
+        //Comma separated topics are supported in startup properties
+        subscriber = new JMSMessageConsumer(BROKER_IMPL_CLASS, "", "",
+                BROKER_URL, TOPIC_NAME + "," + SECONDARY_TOPIC_NAME, jobEndService);
+
+        subscriber.startSubscriber();
     }
 
-    public void sendMessages(String topic) throws JMSException, FalconException, IOException {
+    public void sendMessages(String topic, WorkflowExecutionContext.Type type)
+        throws JMSException, FalconException, IOException {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
@@ -74,32 +86,100 @@ public class JMSMessageConsumerTest {
         javax.jms.MessageProducer producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
-        for (int i = 0; i < 3; i++) {
-            WorkflowExecutionContext context = WorkflowExecutionContext.create(
-                    getMockFalconMessage(i), WorkflowExecutionContext.Type.POST_PROCESSING);
+        for (int i = 0; i < 5; i++) {
+            Message message = null;
 
-            MapMessage message = session.createMapMessage();
-            for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
-                message.setString(entry.getKey().getName(), entry.getValue());
+            switch(type) {
+            case POST_PROCESSING:
+                message = getMockFalconMessage(i, session);
+                break;
+            case WORKFLOW_JOB:
+                message = getMockOozieMessage(i, session);
+                break;
+            case COORDINATOR_ACTION:
+                message = getMockOozieCoordMessage(i, session);
+            default:
+                break;
             }
-
             Log.debug("Sending:" + message);
             producer.send(message);
         }
+    }
 
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(
-                getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING);
-
-        MapMessage mapMessage = session.createMapMessage();
-        for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
-            mapMessage.setString(entry.getKey().getName(), entry.getValue());
+    private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException {
+        TextMessage message = session.createTextMessage();
+        message.setStringProperty("appType", "WORKFLOW_JOB");
+        message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+        message.setStringProperty("user", "falcon");
+        switch(i % 4) {
+        case 0:
+            message.setText("{\"status\":\"RUNNING\",\"id\":\"0000042-130618221729631-oozie-oozi-W\""
+                    + ",\"startTime\":1342915200000}");
+            break;
+        case 1:
+            message.setText("{\"status\":\"FAILED\",\"errorCode\":\"EL_ERROR\","
+                    + "\"errorMessage\":\"variable [dummyvalue] cannot be resolved\","
+                    + "\"id\":\"0000042-130618221729631-oozie-oozi-W\",\"startTime\":1342915200000,"
+                    + "\"endTime\":1366672183543}");
+            break;
+        case 2:
+            message.setText("{\"status\":\"SUCCEEDED\",\"id\":\"0000039-130618221729631-oozie-oozi-W\""
+                    + ",\"startTime\":1342915200000,"
+                    + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C@1\",\"endTime\":1366676224154}");
+            break;
+        case 3:
+            message.setText("{\"status\":\"SUSPENDED\",\"id\":\"0000039-130618221729631-oozie-oozi-W\","
+                    + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C@1\"}");
+            break;
+        default:
         }
+        return message;
+    }
 
-        Log.debug("Sending:" + mapMessage);
-        producer.send(mapMessage);
+    private Message getMockOozieCoordMessage(int i, Session session) throws FalconException, JMSException {
+        TextMessage message = session.createTextMessage();
+        message.setStringProperty("appType", "COORDINATOR_ACTION");
+        message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+        message.setStringProperty("user", "falcon");
+        switch(i % 5) {
+        case 0:
+            message.setText("{\"status\":\"WAITING\",\"nominalTime\":1310342400000,\"missingDependency\""
+                    + ":\"hdfs://gsbl90107.blue.com:8020/user/john/dir1/file1\","
+                    + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000,"
+                    + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\"}");
+            message.setStringProperty("eventStatus", "WAITING");
+            break;
+        case 1:
+            message.setText("{\"status\":\"RUNNING\",\"nominalTime\":1310342400000,"
+                    + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\","
+                    + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C\"}");
+            message.setStringProperty("eventStatus", "STARTED");
+            break;
+        case 2:
+            message.setText("{\"status\":\"SUCCEEDED\",\"nominalTime\":1310342400000,"
+                    + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\","
+                    + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C\","
+                    + "\"endTime\":1366677082799}");
+            message.setStringProperty("eventStatus", "SUCCESS");
+            break;
+        case 3:
+            message.setText("{\"status\":\"FAILED\",\"errorCode\":\"E0101\",\"errorMessage\":"
+                    + "\"dummyError\",\"nominalTime\":1310342400000,"
+                    + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000,"
+                    + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\",\"endTime\":1366677140818}");
+            message.setStringProperty("eventStatus", "FAILURE");
+            break;
+        case 4:
+            message.setText("{\"status\":\"TIMEDOUT\",\"nominalTime\":1310342400000,"
+                    + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000,"
+                    + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\",\"endTime\":1366677140818}");
+            message.setStringProperty("eventStatus", "FAILURE");
+        default:
+        }
+        return message;
     }
 
-    private String[] getMockFalconMessage(int i) {
+    private Message getMockFalconMessage(int i, Session session) throws FalconException, JMSException {
         Map<String, String> message = new HashMap<String, String>();
         message.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS);
         message.put(WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL);
@@ -127,39 +207,78 @@ public class JMSMessageConsumerTest {
             args[index++] = entry.getValue();
         }
 
-        return args;
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                args, WorkflowExecutionContext.Type.POST_PROCESSING);
+
+        MapMessage jmsMessage = session.createMapMessage();
+        for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
+            jmsMessage.setString(entry.getKey().getName(), entry.getValue());
+        }
+
+        return jmsMessage;
     }
 
     @Test
     public void testSubscriber() {
         try {
-            //Comma separated topics are supported in startup properties
-            JMSMessageConsumer subscriber = new JMSMessageConsumer(BROKER_IMPL_CLASS, "", "",
-                    BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService());
-            subscriber.startSubscriber();
-            sendMessages(TOPIC_NAME);
+            sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING);
 
             final BrokerView adminView = broker.getAdminView();
 
             Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
-            Assert.assertEquals(adminView.getTotalEnqueueCount(), 11);
+            Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
             Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
 
-            sendMessages(SECONDARY_TOPIC_NAME);
+            sendMessages(SECONDARY_TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING);
 
             Assert.assertEquals(adminView.getTotalEnqueueCount(), 18);
             Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
             Assert.assertEquals(adminView.getTotalConsumerCount(), 3);
-
-            subscriber.closeSubscriber();
         } catch (Exception e) {
             Assert.fail("This should not have thrown an exception.", e);
         }
     }
 
-    @AfterClass
-    public void tearDown() throws Exception {
-        broker.deleteAllMessages();
+    @Test
+    public void testJMSMessagesFromOozie() throws Exception {
+        sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB);
+
+        final BrokerView adminView = broker.getAdminView();
+
+        Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+        Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
+        Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
+
+        // Async operations. Give some time for messages to be processed.
+        Thread.sleep(100);
+        Mockito.verify(jobEndService, Mockito.times(2)).notifyStart(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
+    }
+
+    @Test
+    public void testJMSMessagesForOozieCoord() throws Exception {
+        sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.COORDINATOR_ACTION);
+
+        final BrokerView adminView = broker.getAdminView();
+
+        Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+        Assert.assertEquals(adminView.getTotalEnqueueCount(), 12);
+        Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
+
+        // Async operations. Give some time for messages to be processed.
+        Thread.sleep(100);
+        Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.times(1)).notifyWait(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.times(1)).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception{
         broker.stop();
+        subscriber.closeSubscriber();
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie-el-extensions/src/main/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/main/conf/oozie-site.xml b/oozie-el-extensions/src/main/conf/oozie-site.xml
index 0925b41..5ef7f2a 100644
--- a/oozie-el-extensions/src/main/conf/oozie-site.xml
+++ b/oozie-el-extensions/src/main/conf/oozie-site.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+G<?xml version="1.0"?>
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
@@ -205,5 +205,52 @@
             EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
         </description>
     </property>
+    <!-- Required to Notify Falcon on Workflow job status. -->
+    <property>
+        <name>oozie.services.ext</name>
+        <value>
+            org.apache.oozie.service.JMSAccessorService,
+            org.apache.oozie.service.JMSTopicService,
+            org.apache.oozie.service.EventHandlerService
+        </value>
+    </property>
+    <property>
+        <name>oozie.service.EventHandlerService.event.listeners</name>
+        <value>
+            org.apache.oozie.jms.JMSJobEventListener
+        </value>
+    </property>
+    <property>
+        <name>oozie.jms.producer.connection.properties</name>
+        <value>
+            java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616
+        </value>
+    </property>
+    <property>
+    <name>oozie.service.JMSTopicService.topic.name</name>
+    <value>
+        WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC
+    </value>
+    <description>
+    Topic options are ${username} or a fixed string which can be specified as default or for a
+    particular job type.
+    For e.g To have a fixed string topic for workflows, coordinators and bundles,
+    specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
+    where job type can be WORKFLOW, COORDINATOR or BUNDLE.
+    Following example defines topic for workflow job, workflow action, coordinator job, coordinator action,
+    bundle job and bundle action
+    WORKFLOW=workflow,
+    COORDINATOR=coordinator,
+    BUNDLE=bundle
+        For jobs with no defined topic, default topic will be ${username}
+    </description>
+    </property>
+    <property>
+        <name>oozie.service.JMSTopicService.topic.prefix</name>
+        <value>FALCON.</value>
+        <description>
+            This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name. For eg: oozie.
+        </description>
+    </property>
 </configuration>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index cff1187..4961896 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -62,9 +62,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         LOG.info("Moving logs {}", context);
         invokeLogProducer(context);
 
-        LOG.info("Sending falcon message {}", context);
-        invokeFalconMessageProducer(context);
-
         return 0;
     }
 
@@ -75,13 +72,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
-    private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
-        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
-                .type(JMSMessageProducer.MessageType.FALCON)
-                .build();
-        jmsMessageProducer.sendMessage();
-    }
-
     private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 6660af1..09c29ab 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -48,6 +48,7 @@ import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -55,6 +56,7 @@ import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.client.JMSConnectionInfo;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.OozieClient;
@@ -589,6 +591,25 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles);
     }
 
+    @Override
+    public boolean isNotificationEnabled(String cluster, String jobID) throws FalconException {
+        OozieClient client = OozieClientFactory.get(cluster);
+        try {
+            JMSConnectionInfo jmsConnection = client.getJMSConnectionInfo();
+            if (jmsConnection != null && !jmsConnection.getJNDIProperties().isEmpty()){
+                String falconTopic  = StartupProperties.get().getProperty("entity.topic", "FALCON.ENTITY.TOPIC");
+                String oozieTopic = client.getJMSTopicName(jobID);
+                if (falconTopic.equals(oozieTopic)) {
+                    return true;
+                }
+            }
+        } catch (OozieClientException e) {
+            LOG.error("Error while retrieving JMS connection info", e);
+        }
+
+        return false;
+    }
+
     private static enum JobAction {
         KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
     }
@@ -1618,6 +1639,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 instance.endTime = jobInfo.getEndTime();
             }
             instance.cluster = cluster;
+            instance.runId = jobInfo.getRun();
+            instance.status = WorkflowStatus.valueOf(jobInfo.getStatus().name());
+            instance.wfParams = getWFParams(jobInfo);
             instances[0] = instance;
             InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED,
                     "Instance for workflow id:" + jobId);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 9d31d17..4b74368 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -179,9 +179,6 @@ public class FalconPostProcessingTest {
             verifyMesssage(consumer);
         }
 
-        // Verify falcon message
-        verifyMesssage(consumer);
-
         connection.close();
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 785dce8..64177a4 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -239,4 +239,19 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
     public void onFailure(WorkflowExecutionContext context) throws FalconException {
         // do nothing since late data does not apply for failed workflows
     }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+        // do nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+        // do nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing.
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index b952bbe..7aa094a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -106,9 +106,28 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
 
     @Override
     public void onFailure(WorkflowExecutionContext context) throws FalconException {
+        // Re-run does not make sense on timeouts.
+        if (context.hasWorkflowTimedOut()) {
+            return;
+        }
         handleRerun(context.getClusterName(), context.getEntityType(),
                 context.getEntityName(), context.getNominalTimeAsISO8601(),
                 context.getWorkflowRunIdString(), context.getWorkflowId(),
                 context.getWorkflowUser(), context.getExecutionCompletionTime());
     }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/webapp/src/conf/oozie/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/webapp/src/conf/oozie/conf/oozie-site.xml b/webapp/src/conf/oozie/conf/oozie-site.xml
index 8545ef9..466f79c 100644
--- a/webapp/src/conf/oozie/conf/oozie-site.xml
+++ b/webapp/src/conf/oozie/conf/oozie-site.xml
@@ -523,6 +523,54 @@
         </description>
     </property>
 
+    <!-- Required to Notify Falcon on Workflow job status. -->
+    <property>
+        <name>oozie.services.ext</name>
+        <value>
+            org.apache.oozie.service.JMSAccessorService,
+            org.apache.oozie.service.JMSTopicService,
+            org.apache.oozie.service.EventHandlerService
+        </value>
+    </property>
+    <property>
+        <name>oozie.service.EventHandlerService.event.listeners</name>
+        <value>
+            org.apache.oozie.jms.JMSJobEventListener
+        </value>
+    </property>
+    <property>
+        <name>oozie.jms.producer.connection.properties</name>
+        <value>
+            java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616
+        </value>
+    </property>
+    <property>
+        <name>oozie.service.JMSTopicService.topic.name</name>
+        <value>
+            WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC
+        </value>
+        <description>
+            Topic options are ${username} or a fixed string which can be specified as default or for a
+            particular job type.
+            For e.g To have a fixed string topic for workflows, coordinators and bundles,
+            specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
+            where job type can be WORKFLOW, COORDINATOR or BUNDLE.
+            Following example defines topic for workflow job, workflow action, coordinator job, coordinator action,
+            bundle job and bundle action
+            WORKFLOW=workflow,
+            COORDINATOR=coordinator,
+            BUNDLE=bundle
+            For jobs with no defined topic, default topic will be ${username}
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.JMSTopicService.topic.prefix</name>
+        <value>FALCON.</value>
+        <description>
+            This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name. For eg: oozie.
+        </description>
+    </property>
+
     <property>
         <name>oozie.service.HadoopAccessorService.supported.filesystems</name>
         <value>*</value>