You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/08 13:36:25 UTC
falcon git commit: FALCON-1231 Improve JobCompletionNotification
Service
Repository: falcon
Updated Branches:
refs/heads/master d08f8bd74 -> f4bd1e7e5
FALCON-1231 Improve JobCompletionNotification Service
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f4bd1e7e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f4bd1e7e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f4bd1e7e
Branch: refs/heads/master
Commit: f4bd1e7e53ca4f113b78d3a5cf4e55aba99fbe93
Parents: d08f8bd
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Thu Oct 8 16:41:59 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Oct 8 16:41:59 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/falcon/resource/InstancesResult.java | 7 +
.../falcon/catalog/CatalogPartitionHandler.java | 15 ++
.../falcon/entity/WorkflowNameBuilder.java | 69 ++++--
.../falcon/metadata/MetadataMappingService.java | 14 ++
.../falcon/workflow/WorkflowExecutionArgs.java | 3 +
.../workflow/WorkflowExecutionContext.java | 44 +++-
.../workflow/WorkflowExecutionListener.java | 31 +++
.../WorkflowJobEndNotificationService.java | 217 ++++++++++++++++---
.../workflow/engine/AbstractWorkflowEngine.java | 2 +
.../falcon/entity/TestWorkflowNameBuilder.java | 18 ++
.../workflow/WorkflowExecutionContextTest.java | 17 ++
.../WorkflowJobEndNotificationServiceTest.java | 59 +++--
messaging/pom.xml | 7 +
.../falcon/messaging/JMSMessageConsumer.java | 155 +++++++++++--
.../messaging/JMSMessageConsumerTest.java | 187 +++++++++++++---
.../src/main/conf/oozie-site.xml | 49 ++++-
.../falcon/workflow/FalconPostProcessing.java | 10 -
.../workflow/engine/OozieWorkflowEngine.java | 24 ++
.../workflow/FalconPostProcessingTest.java | 3 -
.../falcon/rerun/handler/LateRerunHandler.java | 15 ++
.../falcon/rerun/handler/RetryHandler.java | 19 ++
webapp/src/conf/oozie/conf/oozie-site.xml | 48 ++++
23 files changed, 877 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02a65a1..c845062 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Trunk (Unreleased)
FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
IMPROVEMENTS
+ FALCON-1231 Improve JobCompletionNotification Service(Pallavi Rao)
+
FALCON-1157 Build error when using maven 3.3.x(Venkat Ramachandran via Pallavi Rao)
FALCON-1477 Adding "-debug" option to Falcon CLI for debug statements to stdout(Narayan Periwal via Pallavi Rao)
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index 76bb4b0..e05eeeb 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -109,6 +109,9 @@ public class InstancesResult extends APIResult {
public Date endTime;
@XmlElement
+ public int runId;
+
+ @XmlElement
public String details;
@XmlElement
@@ -154,6 +157,10 @@ public class InstancesResult extends APIResult {
return endTime;
}
+ public int getRunId() {
+ return runId;
+ }
+
public InstanceAction[] getActions() {
return actions;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
index d0b09df..cccb4f8 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
@@ -295,4 +295,19 @@ public class CatalogPartitionHandler implements WorkflowExecutionListener{
public void onFailure(WorkflowExecutionContext context) throws FalconException {
//no-op
}
+
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
index 6890594..c58be64 100644
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
import org.apache.falcon.Pair;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import java.util.ArrayList;
import java.util.List;
@@ -56,13 +57,13 @@ public class WorkflowNameBuilder<T extends Entity> {
}
public Tag getWorkflowTag(String workflowName) {
- return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? null
- : WorkflowName.getTagAndSuffixes(entity, workflowName).first;
+ return WorkflowName.getTagAndSuffixes(workflowName) == null ? null
+ : WorkflowName.getTagAndSuffixes(workflowName).first;
}
public String getWorkflowSuffixes(String workflowName) {
- return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? ""
- : WorkflowName.getTagAndSuffixes(entity, workflowName).second;
+ return WorkflowName.getTagAndSuffixes(workflowName) == null ? ""
+ : WorkflowName.getTagAndSuffixes(workflowName).second;
}
/**
@@ -70,6 +71,7 @@ public class WorkflowNameBuilder<T extends Entity> {
*/
public static class WorkflowName {
private static final String SEPARATOR = "_";
+ private static final Pattern WF_NAME_PATTERN;
private String prefix;
private String entityType;
@@ -77,6 +79,32 @@ public class WorkflowNameBuilder<T extends Entity> {
private String entityName;
private List<String> suffixes;
+ static {
+ StringBuilder typePattern = new StringBuilder("(");
+ for (EntityType type : EntityType.values()) {
+ typePattern.append(type.name());
+ typePattern.append("|");
+ }
+ typePattern = typePattern.deleteCharAt(typePattern.length() - 1);
+ typePattern.append(")");
+ StringBuilder tagsPattern = new StringBuilder("(");
+ for (Tag tag : Tag.values()) {
+ tagsPattern.append(tag.name());
+ tagsPattern.append("|");
+ }
+ tagsPattern = tagsPattern.deleteCharAt(tagsPattern.length() - 1);
+ tagsPattern.append(")");
+
+ String name = "([a-zA-Z][\\-a-zA-Z0-9]*)";
+
+ String suffix = "([_A-Za-z0-9-.]*)";
+
+ String namePattern = PREFIX + SEPARATOR + typePattern + SEPARATOR + tagsPattern
+ + SEPARATOR + name + suffix;
+
+ WF_NAME_PATTERN = Pattern.compile(namePattern);
+ }
+
public WorkflowName(String prefix, String entityType, String tag,
String entityName, List<String> suffixes) {
this.prefix = prefix;
@@ -100,28 +128,27 @@ public class WorkflowNameBuilder<T extends Entity> {
return builder.toString();
}
- public static Pair<Tag, String> getTagAndSuffixes(Entity entity,
- String workflowName) {
-
- StringBuilder namePattern = new StringBuilder(PREFIX + SEPARATOR
- + entity.getEntityType().name() + SEPARATOR + "(");
- for (Tag tag : Tag.values()) {
- namePattern.append(tag.name());
- namePattern.append("|");
+ public static Pair<Tag, String> getTagAndSuffixes(String workflowName) {
+ Matcher matcher = WF_NAME_PATTERN.matcher(workflowName);
+ if (matcher.matches()) {
+ matcher.reset();
+ if (matcher.find()) {
+ String tag = matcher.group(2);
+ String suffixes = matcher.group(4);
+ return new Pair<>(Tag.valueOf(tag), suffixes);
+ }
}
- namePattern = namePattern.deleteCharAt(namePattern.length() - 1);
- namePattern.append(")" + SEPARATOR + entity.getName()
- + "([_A-Za-z0-9-.]*)");
-
- Pattern pattern = Pattern.compile(namePattern.toString());
+ return null;
+ }
- Matcher matcher = pattern.matcher(workflowName);
+ public static Pair<String, EntityType> getEntityNameAndType(String workflowName) {
+ Matcher matcher = WF_NAME_PATTERN.matcher(workflowName);
if (matcher.matches()) {
matcher.reset();
if (matcher.find()) {
- String tag = matcher.group(1);
- String suffixes = matcher.group(2);
- return new Pair<Tag, String>(Tag.valueOf(tag), suffixes);
+ String type = matcher.group(1);
+ String name = matcher.group(3);
+ return new Pair<>(name, EntityType.valueOf(type));
}
}
return null;
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index ef9da45..56fbde0 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -297,6 +297,20 @@ public class MetadataMappingService
// do nothing since lineage is only recorded for successful workflow
}
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException {
+ // TBD
+ }
private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 9456fb9..d2430a2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -35,6 +35,9 @@ public enum WorkflowExecutionArgs {
// where
CLUSTER_NAME("cluster", "name of the current cluster"),
OPERATION("operation", "operation like generate, delete, replicate"),
+ // Exactly same as the above. Introduced to ensure compatibility between messages produced by POST-PROCESSING and
+ // the values in conf.
+ DATA_OPERATION("falconDataOperation", "operation like generate, delete, replicate", false),
// who
WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 4454239..45b6d23 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -62,12 +62,12 @@ public class WorkflowExecutionContext {
/**
* Workflow execution status.
*/
- public enum Status {SUCCEEDED, FAILED}
+ public enum Status {WAITING, RUNNING, SUSPENDED, SUCCEEDED, FAILED, TIMEDOUT, KILLED}
/**
* Workflow execution type.
*/
- public enum Type {PRE_PROCESSING, POST_PROCESSING}
+ public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACTION}
/**
* Entity operations supported.
@@ -107,6 +107,10 @@ public class WorkflowExecutionContext {
return context.get(arg);
}
+ public void setValue(WorkflowExecutionArgs arg, String value) {
+ context.put(arg, value);
+ }
+
public String getValue(WorkflowExecutionArgs arg, String defaultValue) {
return context.containsKey(arg) ? context.get(arg) : defaultValue;
}
@@ -128,10 +132,22 @@ public class WorkflowExecutionContext {
return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
}
+ public boolean hasWorkflowTimedOut() {
+ return Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS));
+ }
+
+ public boolean hasWorkflowBeenKilled() {
+ return Status.KILLED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
+ }
+
public String getContextFile() {
return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
}
+ public Status getWorkflowStatus() {
+ return Status.valueOf(getValue(WorkflowExecutionArgs.STATUS));
+ }
+
public String getLogDir() {
return getValue(WorkflowExecutionArgs.LOG_DIR);
}
@@ -211,7 +227,10 @@ public class WorkflowExecutionContext {
}
public EntityOperations getOperation() {
- return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION));
+ if (getValue(WorkflowExecutionArgs.OPERATION) != null) {
+ return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION));
+ }
+ return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.DATA_OPERATION));
}
public String getOutputFeedNames() {
@@ -282,6 +301,19 @@ public class WorkflowExecutionContext {
return creationTime;
}
+ public long getWorkflowStartTime() {
+ return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME));
+ }
+
+ public long getWorkflowEndTime() {
+ return Long.parseLong(getValue(WorkflowExecutionArgs.WF_END_TIME));
+ }
+
+
+ public Type getContextType() {
+ return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE));
+ }
+
/**
* this method is invoked from with in the workflow.
*
@@ -397,7 +429,11 @@ public class WorkflowExecutionContext {
}
public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties) {
- wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, Type.POST_PROCESSING.name());
+ return WorkflowExecutionContext.create(wfProperties, Type.POST_PROCESSING);
+ }
+
+ public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties, Type type) {
+ wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
return new WorkflowExecutionContext(wfProperties);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
index 2d3a477..7bf14f2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
@@ -25,7 +25,38 @@ import org.apache.falcon.FalconException;
*/
public interface WorkflowExecutionListener {
+ /**
+ * Invoked when a workflow is succeeds.
+ * @param context
+ * @throws FalconException
+ */
void onSuccess(WorkflowExecutionContext context) throws FalconException;
+ /**
+ * Invoked when a workflow fails.
+ * @param context
+ * @throws FalconException
+ */
void onFailure(WorkflowExecutionContext context) throws FalconException;
+
+ /**
+ * Invoked on start of a workflow. Basically, when the workflow is RUNNING.
+ * @param context
+ * @throws FalconException
+ */
+ void onStart(WorkflowExecutionContext context) throws FalconException;
+
+ /**
+ * Invoked when a workflow is suspended.
+ * @param context
+ * @throws FalconException
+ */
+ void onSuspend(WorkflowExecutionContext context) throws FalconException;
+
+ /**
+ * Invoked when a workflow is in waiting state.
+ * @param context
+ * @throws FalconException
+ */
+ void onWait(WorkflowExecutionContext context) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index c4f8843..5c75f5c 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -21,19 +21,24 @@ package org.apache.falcon.workflow;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* A workflow job end notification service.
@@ -46,11 +51,21 @@ public class WorkflowJobEndNotificationService implements FalconService {
private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
+ // Maintain a cache of context built, so we don't have to query Oozie for every state change.
+ private Map<String, Properties> contextMap = new ConcurrentHashMap<>();
+
+ private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
+
@Override
public String getName() {
return SERVICE_NAME;
}
+ // Mainly for test
+ Map<String, Properties> getContextMap() {
+ return contextMap;
+ }
+
@Override
public void init() throws FalconException {
String listenerClassNames = StartupProperties.get().getProperty(
@@ -83,9 +98,34 @@ public class WorkflowJobEndNotificationService implements FalconService {
}
public void notifyFailure(WorkflowExecutionContext context) {
+ notifyWorkflowEnd(context);
+ }
+
+ public void notifySuccess(WorkflowExecutionContext context) {
+ notifyWorkflowEnd(context);
+ }
+
+ public void notifyStart(WorkflowExecutionContext context) {
+ // Start notifications can only be from Oozie JMS notifications
+ updateContextFromWFConf(context);
+ LOG.debug("Sending workflow start notification to listeners with context : {} ", context);
+ for (WorkflowExecutionListener listener : listeners) {
+ try {
+ listener.onStart(context);
+ } catch (Throwable t) {
+ // do not rethrow as other listeners do not get a chance
+ LOG.error("Error in listener {}", listener.getClass().getName(), t);
+ }
+ }
+ }
+
+ public void notifySuspend(WorkflowExecutionContext context) {
+ // Suspend notifications can only be from Oozie JMS notifications
+ updateContextFromWFConf(context);
+ LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
- listener.onFailure(context);
+ listener.onSuspend(context);
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
@@ -93,19 +133,142 @@ public class WorkflowJobEndNotificationService implements FalconService {
}
instrumentAlert(context);
+ contextMap.remove(context.getWorkflowId());
}
- public void notifySuccess(WorkflowExecutionContext context) {
+ public void notifyWait(WorkflowExecutionContext context) {
+ // Wait notifications can only be from Oozie JMS notifications
+ updateContextFromWFConf(context);
+ LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
- listener.onSuccess(context);
+ listener.onWait(context);
} catch (Throwable t) {
// do not rethrow as other listeners do not get a chance
LOG.error("Error in listener {}", listener.getClass().getName(), t);
}
}
+ }
- instrumentAlert(context);
+ // The method retrieves the conf from the cache if it is in cache.
+ // Else, queries WF Engine to retrieve the conf of the workflow
+ private void updateContextFromWFConf(WorkflowExecutionContext context) {
+ try {
+ Properties wfProps = contextMap.get(context.getWorkflowId());
+ if (wfProps == null) {
+ Entity entity = CONFIG_STORE.get(EntityType.valueOf(context.getEntityType()), context.getEntityName());
+ // Entity can be null in case of delete. Engine will generate notifications for instance kills.
+ // But, the entity would no longer be in the config store.
+ if (entity == null) {
+ return;
+ }
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ try {
+ InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine()
+ .getJobDetails(cluster, context.getWorkflowId()).getInstances();
+ if (instances != null && instances.length > 0) {
+ wfProps = getWFProps(instances[0].getWfParams());
+ // Required by RetryService. But, is not part of conf.
+ wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
+ Integer.toString(instances[0].getRunId()));
+ }
+ } catch (FalconException e) {
+ // Do Nothing. The workflow may not have been deployed on this cluster.
+ continue;
+ }
+ contextMap.put(context.getWorkflowId(), wfProps);
+ }
+ }
+
+ // No extra props to enhance the context with.
+ if (wfProps == null || wfProps.isEmpty()) {
+ return;
+ }
+
+ for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+ if (wfProps.containsKey(arg.getName())) {
+ context.setValue(arg, wfProps.getProperty(arg.getName()));
+ }
+ }
+
+ } catch (FalconException e) {
+ LOG.error("Unable to retrieve entity {} of type {} from config store.", e);
+ }
+ }
+
+ private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
+ Properties props = new Properties();
+ for (InstancesResult.KeyValuePair kv : wfParams) {
+ props.put(kv.getKey(), kv.getValue());
+ }
+ return props;
+ }
+
+
+ // This method handles both success and failure notifications.
+ private void notifyWorkflowEnd(WorkflowExecutionContext context) {
+ // Need to distinguish notification from post processing for backward compatibility
+ if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
+ boolean engineNotifEnabled = false;
+ try {
+ engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine()
+ .isNotificationEnabled(context.getClusterName(), context.getWorkflowId());
+ } catch (FalconException e) {
+ LOG.debug("Unable to determine if the notification is enabled on the wf engine. Assuming not.", e);
+ }
+ // Ignore the message from post processing as there will be one more from Oozie.
+ if (engineNotifEnabled) {
+ LOG.info("Ignoring message from post processing as engine notification is enabled.");
+ return;
+ } else {
+ updateContextWithTime(context);
+ }
+ } else {
+ updateContextFromWFConf(context);
+ }
+
+ LOG.debug("Sending workflow end notification to listeners with context : {} ", context);
+
+ for (WorkflowExecutionListener listener : listeners) {
+ try {
+ if (context.hasWorkflowSucceeded()) {
+ listener.onSuccess(context);
+ instrumentAlert(context);
+ } else {
+ listener.onFailure(context);
+ if (context.hasWorkflowBeenKilled() || context.hasWorkflowFailed()) {
+ instrumentAlert(context);
+ }
+ }
+ } catch (Throwable t) {
+ // do not rethrow as other listeners do not get a chance
+ LOG.error("Error in listener {}", listener.getClass().getName(), t);
+ }
+ }
+
+ contextMap.remove(context.getWorkflowId());
+ }
+
+ // In case of notifications coming from post notifications, start and end time need to be populated.
+ private void updateContextWithTime(WorkflowExecutionContext context) {
+ try {
+ InstancesResult result = WorkflowEngineFactory.getWorkflowEngine()
+ .getJobDetails(context.getClusterName(), context.getWorkflowId());
+ Date startTime = result.getInstances()[0].startTime;
+ Date endTime = result.getInstances()[0].endTime;
+ Date now = new Date();
+ if (startTime == null) {
+ startTime = now;
+ }
+ if (endTime == null) {
+ endTime = now;
+ }
+ context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime()));
+ context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime()));
+ } catch(FalconException e) {
+ LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster "
+ + context.getClusterName(), e);
+ }
}
private void instrumentAlert(WorkflowExecutionContext context) {
@@ -117,27 +280,31 @@ public class WorkflowJobEndNotificationService implements FalconService {
String workflowUser = context.getWorkflowUser();
String nominalTime = context.getNominalTimeAsISO8601();
String runId = String.valueOf(context.getWorkflowRunId());
+ Date now = new Date();
+ // Start and/or End time may not be set in case of workflow suspend
+ Date endTime;
+ if (context.getWorkflowEndTime() == 0) {
+ endTime = now;
+ } else {
+ endTime = new Date(context.getWorkflowEndTime());
+ }
- try {
- CurrentUser.authenticate(context.getWorkflowUser());
- AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
- Date startTime = result.getInstances()[0].startTime;
- Date endTime = result.getInstances()[0].endTime;
- Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+ Date startTime;
+ if (context.getWorkflowStartTime() == 0) {
+ startTime = now;
+ } else {
+ startTime = new Date(context.getWorkflowStartTime());
+ }
+ Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
- if (context.hasWorkflowFailed()) {
- GenericAlert.instrumentFailedInstance(clusterName, entityType,
- entityName, nominalTime, workflowId, workflowUser, runId, operation,
- SchemaHelper.formatDateUTC(startTime), "", "", duration);
- } else {
- GenericAlert.instrumentSucceededInstance(clusterName, entityType,
- entityName, nominalTime, workflowId, workflowUser, runId, operation,
- SchemaHelper.formatDateUTC(startTime), duration);
- }
- } catch (FalconException e) {
- // Logging an error and ignoring since there are listeners for extensions
- LOG.error("Instrumenting alert failed for: " + context, e);
+ if (context.hasWorkflowFailed()) {
+ GenericAlert.instrumentFailedInstance(clusterName, entityType,
+ entityName, nominalTime, workflowId, workflowUser, runId, operation,
+ SchemaHelper.formatDateUTC(startTime), "", "", duration);
+ } else {
+ GenericAlert.instrumentSucceededInstance(clusterName, entityType,
+ entityName, nominalTime, workflowId, workflowUser, runId, operation,
+ SchemaHelper.formatDateUTC(startTime), duration);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 0b560bb..8b3460a 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -105,4 +105,6 @@ public abstract class AbstractWorkflowEngine {
public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end,
List<LifeCycle> lifeCycles) throws FalconException;
+
+ public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java b/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
index 6060731..5b1af78 100644
--- a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
+++ b/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java
@@ -17,10 +17,13 @@
*/
package org.apache.falcon.entity;
+import org.apache.falcon.Pair;
import org.apache.falcon.Tag;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Arrays;
@@ -88,4 +91,19 @@ public class TestWorkflowNameBuilder {
"FALCON_PROCESS_DEFAULT_agg-logs");
}
+
+ @Test(dataProvider = "workflowNames")
+ public void workflowNameTypeTest(String wfName, Pair<String, EntityType> nameType) {
+ Assert.assertEquals(WorkflowNameBuilder.WorkflowName.getEntityNameAndType(wfName), nameType);
+ }
+
+ @DataProvider(name = "workflowNames")
+ public Object[][] getWorkflowNames() {
+ return new Object[][] {
+ {"FALCON_PROCESS_DEFAULT_agg-logs", new Pair<>("agg-logs", EntityType.PROCESS)},
+ {"FALCON_FEED_REPLICATION_raw-logs", new Pair<>("raw-logs", EntityType.FEED)},
+ {"FALCON_FEED_RETENTION_logs2", new Pair<>("logs2", EntityType.FEED)},
+ {"FALCON_FEED_REPLICATION_logs_colo1", new Pair<>("logs", EntityType.FEED)},
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index 65a057d..4fdc1e9 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -24,6 +24,8 @@ import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.Date;
+
/**
* A test for WorkflowExecutionContext.
@@ -86,6 +88,7 @@ public class WorkflowExecutionContextTest {
@Test
public void testHasWorkflowSucceeded() throws Exception {
Assert.assertTrue(context.hasWorkflowSucceeded());
+ Assert.assertEquals(context.getWorkflowStatus(), WorkflowExecutionContext.Status.SUCCEEDED);
}
@Test
@@ -240,6 +243,18 @@ public class WorkflowExecutionContextTest {
}
@Test
+ public void testWorkflowStartEnd() throws Exception {
+ Assert.assertEquals(context.getWorkflowEndTime() - context.getWorkflowStartTime(), 1000000);
+ }
+
+ @Test
+ public void testSetAndGetValue() throws Exception {
+ context.setValue(WorkflowExecutionArgs.RUN_ID, "10");
+ Assert.assertEquals(context.getValue(WorkflowExecutionArgs.RUN_ID), "10");
+ context.setValue(WorkflowExecutionArgs.RUN_ID, "1");
+ }
+
+ @Test
public void testSerializeDeserialize() throws Exception {
String contextFile = context.getContextFile();
context.serialize();
@@ -318,6 +333,8 @@ public class WorkflowExecutionContextTest {
"-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
"-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+ "-" + WorkflowExecutionArgs.WF_START_TIME.getName(), Long.toString(new Date().getTime()),
+ "-" + WorkflowExecutionArgs.WF_END_TIME.getName(), Long.toString(new Date().getTime() + 1000000),
};
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index b7df443..1a9597b 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -22,10 +22,13 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.util.StartupProperties;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import java.util.Date;
+import java.util.Properties;
+
/**
* A test for WorkflowJobEndNotificationService.
*/
@@ -55,15 +58,17 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
private WorkflowJobEndNotificationService service;
private WorkflowExecutionContext savedContext;
- @BeforeMethod
+ @BeforeClass
public void setUp() throws Exception {
service = new WorkflowJobEndNotificationService();
savedContext = WorkflowExecutionContext.create(getTestMessageArgs(),
WorkflowExecutionContext.Type.POST_PROCESSING);
Assert.assertNotNull(savedContext);
+ service.init();
+ service.registerListener(this);
}
- @AfterMethod
+ @AfterClass
public void tearDown() throws Exception {
service.destroy();
}
@@ -73,29 +78,30 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
Assert.assertEquals(service.getName(), WorkflowJobEndNotificationService.SERVICE_NAME);
}
- @Test
- public void testInit() throws Exception {
- String listenerClassNames = StartupProperties.get().getProperty(
- "workflow.execution.listeners");
- Assert.assertEquals(listenerClassNames, "");
-
+ @Test(priority = -1)
+ public void testBasic() throws Exception {
try {
- StartupProperties.get().setProperty("workflow.execution.listeners",
- "org.apache.falcon.workflow.WorkflowJobEndNotificationServiceTest");
- listenerClassNames = StartupProperties.get().getProperty(
- "workflow.execution.listeners");
- Assert.assertEquals(listenerClassNames,
- "org.apache.falcon.workflow.WorkflowJobEndNotificationServiceTest");
-
- service.init();
notifyFailure(savedContext);
notifySuccess(savedContext);
} finally {
- service.unregisterListener(this);
StartupProperties.get().setProperty("workflow.execution.listeners", "");
}
}
+ @Test
+ public void testNotificationsFromEngine() throws FalconException {
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+ WorkflowExecutionContext.Type.WORKFLOW_JOB);
+
+ // Pretend the start was already notified
+ Properties wfProps = new Properties();
+ wfProps.put(WorkflowExecutionArgs.CLUSTER_NAME.name(), CLUSTER_NAME);
+ service.getContextMap().put("workflow-01-00", wfProps);
+
+ // Should retrieve from cache.
+ service.notifySuspend(context);
+ }
+
@Override
public void onSuccess(WorkflowExecutionContext context) throws FalconException {
Assert.assertNotNull(context);
@@ -108,6 +114,19 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
Assert.assertEquals(context.entrySet().size(), 28);
}
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException {
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException {
+
+ }
+
private void notifyFailure(WorkflowExecutionContext context) {
service.notifyFailure(context);
}
@@ -150,6 +169,8 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
"-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
"-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+ "-" + WorkflowExecutionArgs.WF_START_TIME.getName(), Long.toString(new Date().getTime()),
+ "-" + WorkflowExecutionArgs.WF_END_TIME.getName(), Long.toString(new Date().getTime() + 1000000),
};
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index 918de63..e64a3f8 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -176,5 +176,12 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index d3178fb..bbb5d9b 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -20,12 +20,17 @@ package org.apache.falcon.messaging;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.WorkflowNameBuilder;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.messaging.util.MessagingUtil;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.json.JSONException;
+import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,12 +42,17 @@ import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.lang.reflect.InvocationTargetException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import java.util.TimeZone;
/**
* Subscribes to the falcon topic for handling retries and alerts.
@@ -94,22 +104,22 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
@Override
public void onMessage(Message message) {
- MapMessage mapMessage = (MapMessage) message;
LOG.info("Received JMS message {}", message.toString());
-
try {
- WorkflowExecutionContext context = createContext(mapMessage);
- LOG.info("Created context from JMS message {}", context);
-
- // Login the user so listeners can access FS and WfEngine as this user
- CurrentUser.authenticate(context.getWorkflowUser());
-
- if (context.hasWorkflowFailed()) {
- onFailure(context);
- } else if (context.hasWorkflowSucceeded()) {
- onSuccess(context);
+ if (message instanceof MapMessage) {
+ MapMessage mapMessage = (MapMessage) message;
+ WorkflowExecutionContext context = createContext(mapMessage);
+ LOG.info("Created context from Falcon JMS message {}", context);
+ invokeListener(context);
+ // Due to backward compatibility, need to handle messages from post processing too.
+ // Hence cannot use JMS selectors.
+ } else if (shouldHandle(message)) {
+ TextMessage textMessage = (TextMessage) message;
+ WorkflowExecutionContext context = createContext(textMessage);
+ LOG.info("Created context from Oozie JMS message {}", context);
+ invokeListener(context);
}
- } catch (JMSException e) {
+ } catch (Exception e) {
String errorMessage = "Error in onMessage for topicSubscriber of topic: "
+ topicName + ", Message: " + message.toString();
LOG.info(errorMessage, e);
@@ -117,6 +127,117 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
}
}
+ // Creates context from the JMS notification of the workflow engine
+ private WorkflowExecutionContext createContext(TextMessage message) throws JMSException, FalconException {
+ try {
+ // Example Workflow Job in FAILED state:
+ // {"status":"FAILED","errorCode":"EL_ERROR","errorMessage":"variable [dummyvalue] cannot be resolved",
+ // "id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000,"endTime":1366672183543}
+ JSONObject json = new JSONObject(message.getText());
+ long currentTime = System.currentTimeMillis();
+ Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<>();
+ wfProperties.put(WorkflowExecutionArgs.STATUS, json.getString("status"));
+ wfProperties.put(WorkflowExecutionArgs.WORKFLOW_ID, json.getString("id"));
+ wfProperties.put(WorkflowExecutionArgs.WF_START_TIME, json.isNull("startTime")? Long.toString(currentTime)
+ : json.getString("startTime"));
+ wfProperties.put(WorkflowExecutionArgs.WF_END_TIME, json.isNull("endTime")? Long.toString(currentTime)
+ : json.getString("endTime"));
+ if (!json.isNull("nominalTime")) {
+ wfProperties.put(WorkflowExecutionArgs.NOMINAL_TIME,
+ getNominalTimeString(Long.parseLong(json.getString("nominalTime"))));
+ }
+ Pair<String, EntityType> entityTypePair = WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
+ message.getStringProperty("appName"));
+ wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first);
+ wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name());
+ wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user"));
+
+ String appType = message.getStringProperty("appType");
+ return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType));
+
+ } catch (JSONException e) {
+ throw new FalconException("Unable to build a context from the JMS message.", e);
+ }
+ }
+
+ private String getNominalTimeString(long timeInMillis) {
+ Date time = new Date(timeInMillis);
+ final String format = "yyyy-MM-dd-HH-mm";
+ DateFormat dateFormat = new SimpleDateFormat(format);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return dateFormat.format(time);
+ }
+
+ private void invokeListener(WorkflowExecutionContext context) {
+ // Login the user so listeners can access FS and WfEngine as this user
+ CurrentUser.authenticate(context.getWorkflowUser());
+
+ WorkflowExecutionContext.Status status = WorkflowExecutionContext.Status.valueOf(
+ context.getValue(WorkflowExecutionArgs.STATUS));
+
+ // Handle only timeout and wait notifications of coord
+ if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
+ switch(status) {
+ case TIMEDOUT:
+ jobEndNotificationService.notifyFailure(context);
+ break;
+ case WAITING:
+ jobEndNotificationService.notifyWait(context);
+ break;
+ default:
+ break;
+ }
+ } else {
+ switch(status) {
+ case KILLED:
+ case FAILED:
+ jobEndNotificationService.notifyFailure(context);
+ break;
+ case SUCCEEDED:
+ jobEndNotificationService.notifySuccess(context);
+ break;
+ case SUSPENDED:
+ jobEndNotificationService.notifySuspend(context);
+ break;
+ case RUNNING:
+ jobEndNotificationService.notifyStart(context);
+ break;
+ default :
+ throw new IllegalArgumentException("Not valid Status of workflow");
+ }
+ }
+ }
+
+ // Since Oozie has a system level JMS connection info, Falcon should ensure it is handling notifications
+ // of Falcon entities only.
+ private boolean shouldHandle(Message message) {
+ try {
+ String appType = message.getStringProperty("appType");
+ // Handle all workflow job notifications for falcon workflows
+ if (appType != null
+ && WorkflowExecutionContext.Type.WORKFLOW_JOB == WorkflowExecutionContext.Type.valueOf(appType)
+ && WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
+ message.getStringProperty("appName")) != null) {
+ return true;
+ }
+
+ // Handle coord notification for falcon workflows only for WAITING and TIMED_OUT.
+ if (appType != null
+ && WorkflowExecutionContext.Type.COORDINATOR_ACTION
+ == WorkflowExecutionContext.Type.valueOf(appType)
+ && WorkflowNameBuilder.WorkflowName.getEntityNameAndType(
+ message.getStringProperty("appName")) != null) {
+ String status = message.getStringProperty("eventStatus");
+ if (status != null && ("WAITING".equals(status) || "FAILURE".equals(status))) {
+ return true;
+ }
+ }
+ } catch (JMSException e) {
+ LOG.error("Error while parsing the message header", e);
+ }
+ return false;
+ }
+
private WorkflowExecutionContext createContext(MapMessage mapMessage) throws JMSException {
// for backwards compatibility, read all args from message
Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
@@ -130,14 +251,6 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
return WorkflowExecutionContext.create(wfProperties);
}
- public void onFailure(WorkflowExecutionContext context) {
- jobEndNotificationService.notifyFailure(context);
- }
-
- public void onSuccess(WorkflowExecutionContext context) {
- jobEndNotificationService.notifySuccess(context);
- }
-
@Override
public void onException(JMSException ignore) {
String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName;
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 7356ee3..dee7c47 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -26,10 +26,11 @@ import org.apache.falcon.util.FalconTestUtil;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.mockito.Mockito;
import org.mortbay.log.Log;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.jms.Connection;
@@ -37,7 +38,9 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
+import javax.jms.Message;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.IOException;
import java.util.HashMap;
@@ -54,17 +57,26 @@ public class JMSMessageConsumerTest {
private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
private BrokerService broker;
- @BeforeClass
+ private JMSMessageConsumer subscriber;
+ private WorkflowJobEndNotificationService jobEndService;
+
+ @BeforeMethod
public void setup() throws Exception {
broker = new BrokerService();
broker.addConnector(BROKER_URL);
broker.setDataDirectory("target/activemq");
broker.setBrokerName("localhost");
+ jobEndService = Mockito.mock(WorkflowJobEndNotificationService.class);
broker.start();
- broker.deleteAllMessages();
+ //Comma separated topics are supported in startup properties
+ subscriber = new JMSMessageConsumer(BROKER_IMPL_CLASS, "", "",
+ BROKER_URL, TOPIC_NAME + "," + SECONDARY_TOPIC_NAME, jobEndService);
+
+ subscriber.startSubscriber();
}
- public void sendMessages(String topic) throws JMSException, FalconException, IOException {
+ public void sendMessages(String topic, WorkflowExecutionContext.Type type)
+ throws JMSException, FalconException, IOException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
@@ -74,32 +86,100 @@ public class JMSMessageConsumerTest {
javax.jms.MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < 3; i++) {
- WorkflowExecutionContext context = WorkflowExecutionContext.create(
- getMockFalconMessage(i), WorkflowExecutionContext.Type.POST_PROCESSING);
+ for (int i = 0; i < 5; i++) {
+ Message message = null;
- MapMessage message = session.createMapMessage();
- for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
- message.setString(entry.getKey().getName(), entry.getValue());
+ switch(type) {
+ case POST_PROCESSING:
+ message = getMockFalconMessage(i, session);
+ break;
+ case WORKFLOW_JOB:
+ message = getMockOozieMessage(i, session);
+ break;
+ case COORDINATOR_ACTION:
+ message = getMockOozieCoordMessage(i, session);
+ default:
+ break;
}
-
Log.debug("Sending:" + message);
producer.send(message);
}
+ }
- WorkflowExecutionContext context = WorkflowExecutionContext.create(
- getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING);
-
- MapMessage mapMessage = session.createMapMessage();
- for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
- mapMessage.setString(entry.getKey().getName(), entry.getValue());
+ private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException {
+ TextMessage message = session.createTextMessage();
+ message.setStringProperty("appType", "WORKFLOW_JOB");
+ message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+ message.setStringProperty("user", "falcon");
+ switch(i % 4) {
+ case 0:
+ message.setText("{\"status\":\"RUNNING\",\"id\":\"0000042-130618221729631-oozie-oozi-W\""
+ + ",\"startTime\":1342915200000}");
+ break;
+ case 1:
+ message.setText("{\"status\":\"FAILED\",\"errorCode\":\"EL_ERROR\","
+ + "\"errorMessage\":\"variable [dummyvalue] cannot be resolved\","
+ + "\"id\":\"0000042-130618221729631-oozie-oozi-W\",\"startTime\":1342915200000,"
+ + "\"endTime\":1366672183543}");
+ break;
+ case 2:
+ message.setText("{\"status\":\"SUCCEEDED\",\"id\":\"0000039-130618221729631-oozie-oozi-W\""
+ + ",\"startTime\":1342915200000,"
+ + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C@1\",\"endTime\":1366676224154}");
+ break;
+ case 3:
+ message.setText("{\"status\":\"SUSPENDED\",\"id\":\"0000039-130618221729631-oozie-oozi-W\","
+ + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C@1\"}");
+ break;
+ default:
}
+ return message;
+ }
- Log.debug("Sending:" + mapMessage);
- producer.send(mapMessage);
+ private Message getMockOozieCoordMessage(int i, Session session) throws FalconException, JMSException {
+ TextMessage message = session.createTextMessage();
+ message.setStringProperty("appType", "COORDINATOR_ACTION");
+ message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+ message.setStringProperty("user", "falcon");
+ switch(i % 5) {
+ case 0:
+ message.setText("{\"status\":\"WAITING\",\"nominalTime\":1310342400000,\"missingDependency\""
+ + ":\"hdfs://gsbl90107.blue.com:8020/user/john/dir1/file1\","
+ + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000,"
+ + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\"}");
+ message.setStringProperty("eventStatus", "WAITING");
+ break;
+ case 1:
+ message.setText("{\"status\":\"RUNNING\",\"nominalTime\":1310342400000,"
+ + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\","
+ + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C\"}");
+ message.setStringProperty("eventStatus", "STARTED");
+ break;
+ case 2:
+ message.setText("{\"status\":\"SUCCEEDED\",\"nominalTime\":1310342400000,"
+ + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\","
+ + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C\","
+ + "\"endTime\":1366677082799}");
+ message.setStringProperty("eventStatus", "SUCCESS");
+ break;
+ case 3:
+ message.setText("{\"status\":\"FAILED\",\"errorCode\":\"E0101\",\"errorMessage\":"
+ + "\"dummyError\",\"nominalTime\":1310342400000,"
+ + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000,"
+ + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\",\"endTime\":1366677140818}");
+ message.setStringProperty("eventStatus", "FAILURE");
+ break;
+ case 4:
+ message.setText("{\"status\":\"TIMEDOUT\",\"nominalTime\":1310342400000,"
+ + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000,"
+ + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\",\"endTime\":1366677140818}");
+ message.setStringProperty("eventStatus", "FAILURE");
+ default:
+ }
+ return message;
}
- private String[] getMockFalconMessage(int i) {
+ private Message getMockFalconMessage(int i, Session session) throws FalconException, JMSException {
Map<String, String> message = new HashMap<String, String>();
message.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS);
message.put(WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL);
@@ -127,39 +207,78 @@ public class JMSMessageConsumerTest {
args[index++] = entry.getValue();
}
- return args;
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(
+ args, WorkflowExecutionContext.Type.POST_PROCESSING);
+
+ MapMessage jmsMessage = session.createMapMessage();
+ for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
+ jmsMessage.setString(entry.getKey().getName(), entry.getValue());
+ }
+
+ return jmsMessage;
}
@Test
public void testSubscriber() {
try {
- //Comma separated topics are supported in startup properties
- JMSMessageConsumer subscriber = new JMSMessageConsumer(BROKER_IMPL_CLASS, "", "",
- BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService());
- subscriber.startSubscriber();
- sendMessages(TOPIC_NAME);
+ sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING);
final BrokerView adminView = broker.getAdminView();
Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
- Assert.assertEquals(adminView.getTotalEnqueueCount(), 11);
+ Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
- sendMessages(SECONDARY_TOPIC_NAME);
+ sendMessages(SECONDARY_TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING);
Assert.assertEquals(adminView.getTotalEnqueueCount(), 18);
Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
Assert.assertEquals(adminView.getTotalConsumerCount(), 3);
-
- subscriber.closeSubscriber();
} catch (Exception e) {
Assert.fail("This should not have thrown an exception.", e);
}
}
- @AfterClass
- public void tearDown() throws Exception {
- broker.deleteAllMessages();
+ @Test
+ public void testJMSMessagesFromOozie() throws Exception {
+ sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB);
+
+ final BrokerView adminView = broker.getAdminView();
+
+ Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+ Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
+ Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
+
+ // Async operations. Give some time for messages to be processed.
+ Thread.sleep(100);
+ Mockito.verify(jobEndService, Mockito.times(2)).notifyStart(Mockito.any(WorkflowExecutionContext.class));
+ Mockito.verify(jobEndService).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
+ Mockito.verify(jobEndService).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
+ Mockito.verify(jobEndService).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
+ }
+
+ @Test
+ public void testJMSMessagesForOozieCoord() throws Exception {
+ sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.COORDINATOR_ACTION);
+
+ final BrokerView adminView = broker.getAdminView();
+
+ Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+ Assert.assertEquals(adminView.getTotalEnqueueCount(), 12);
+ Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
+
+ // Async operations. Give some time for messages to be processed.
+ Thread.sleep(100);
+ Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
+ Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
+ Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
+ Mockito.verify(jobEndService, Mockito.times(1)).notifyWait(Mockito.any(WorkflowExecutionContext.class));
+ Mockito.verify(jobEndService, Mockito.times(1)).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception{
broker.stop();
+ subscriber.closeSubscriber();
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie-el-extensions/src/main/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/main/conf/oozie-site.xml b/oozie-el-extensions/src/main/conf/oozie-site.xml
index 0925b41..5ef7f2a 100644
--- a/oozie-el-extensions/src/main/conf/oozie-site.xml
+++ b/oozie-el-extensions/src/main/conf/oozie-site.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+G<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -205,5 +205,52 @@
EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
</description>
</property>
+ <!-- Required to Notify Falcon on Workflow job status. -->
+ <property>
+ <name>oozie.services.ext</name>
+ <value>
+ org.apache.oozie.service.JMSAccessorService,
+ org.apache.oozie.service.JMSTopicService,
+ org.apache.oozie.service.EventHandlerService
+ </value>
+ </property>
+ <property>
+ <name>oozie.service.EventHandlerService.event.listeners</name>
+ <value>
+ org.apache.oozie.jms.JMSJobEventListener
+ </value>
+ </property>
+ <property>
+ <name>oozie.jms.producer.connection.properties</name>
+ <value>
+ java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616
+ </value>
+ </property>
+ <property>
+ <name>oozie.service.JMSTopicService.topic.name</name>
+ <value>
+ WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC
+ </value>
+ <description>
+ Topic options are ${username} or a fixed string which can be specified as default or for a
+ particular job type.
+ For e.g To have a fixed string topic for workflows, coordinators and bundles,
+ specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
+ where job type can be WORKFLOW, COORDINATOR or BUNDLE.
+ Following example defines topic for workflow job, workflow action, coordinator job, coordinator action,
+ bundle job and bundle action
+ WORKFLOW=workflow,
+ COORDINATOR=coordinator,
+ BUNDLE=bundle
+ For jobs with no defined topic, default topic will be ${username}
+ </description>
+ </property>
+ <property>
+ <name>oozie.service.JMSTopicService.topic.prefix</name>
+ <value>FALCON.</value>
+ <description>
+ This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name. For eg: oozie.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index cff1187..4961896 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -62,9 +62,6 @@ public class FalconPostProcessing extends Configured implements Tool {
LOG.info("Moving logs {}", context);
invokeLogProducer(context);
- LOG.info("Sending falcon message {}", context);
- invokeFalconMessageProducer(context);
-
return 0;
}
@@ -75,13 +72,6 @@ public class FalconPostProcessing extends Configured implements Tool {
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
}
- private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
- JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
- .type(JMSMessageProducer.MessageType.FALCON)
- .build();
- jmsMessageProducer.sendMessage();
- }
-
private void invokeLogProducer(WorkflowExecutionContext context) {
// todo: need to move this out to Falcon in-process
if (UserGroupInformation.isSecurityEnabled()) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 6660af1..09c29ab 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -48,6 +48,7 @@ import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -55,6 +56,7 @@ import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.client.JMSConnectionInfo;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.Job.Status;
import org.apache.oozie.client.OozieClient;
@@ -589,6 +591,25 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles);
}
+ @Override
+ public boolean isNotificationEnabled(String cluster, String jobID) throws FalconException {
+ OozieClient client = OozieClientFactory.get(cluster);
+ try {
+ JMSConnectionInfo jmsConnection = client.getJMSConnectionInfo();
+ if (jmsConnection != null && !jmsConnection.getJNDIProperties().isEmpty()){
+ String falconTopic = StartupProperties.get().getProperty("entity.topic", "FALCON.ENTITY.TOPIC");
+ String oozieTopic = client.getJMSTopicName(jobID);
+ if (falconTopic.equals(oozieTopic)) {
+ return true;
+ }
+ }
+ } catch (OozieClientException e) {
+ LOG.error("Error while retrieving JMS connection info", e);
+ }
+
+ return false;
+ }
+
private static enum JobAction {
KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
}
@@ -1618,6 +1639,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
instance.endTime = jobInfo.getEndTime();
}
instance.cluster = cluster;
+ instance.runId = jobInfo.getRun();
+ instance.status = WorkflowStatus.valueOf(jobInfo.getStatus().name());
+ instance.wfParams = getWFParams(jobInfo);
instances[0] = instance;
InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED,
"Instance for workflow id:" + jobId);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 9d31d17..4b74368 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -179,9 +179,6 @@ public class FalconPostProcessingTest {
verifyMesssage(consumer);
}
- // Verify falcon message
- verifyMesssage(consumer);
-
connection.close();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 785dce8..64177a4 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -239,4 +239,19 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
public void onFailure(WorkflowExecutionContext context) throws FalconException {
// do nothing since late data does not apply for failed workflows
}
+
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException {
+ // do nothing
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+ // do nothing
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing.
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index b952bbe..7aa094a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -106,9 +106,28 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
@Override
public void onFailure(WorkflowExecutionContext context) throws FalconException {
+ // Re-run does not make sense on timeouts.
+ if (context.hasWorkflowTimedOut()) {
+ return;
+ }
handleRerun(context.getClusterName(), context.getEntityType(),
context.getEntityName(), context.getNominalTimeAsISO8601(),
context.getWorkflowRunIdString(), context.getWorkflowId(),
context.getWorkflowUser(), context.getExecutionCompletionTime());
}
+
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/webapp/src/conf/oozie/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/webapp/src/conf/oozie/conf/oozie-site.xml b/webapp/src/conf/oozie/conf/oozie-site.xml
index 8545ef9..466f79c 100644
--- a/webapp/src/conf/oozie/conf/oozie-site.xml
+++ b/webapp/src/conf/oozie/conf/oozie-site.xml
@@ -523,6 +523,54 @@
</description>
</property>
+ <!-- Required to Notify Falcon on Workflow job status. -->
+ <property>
+ <name>oozie.services.ext</name>
+ <value>
+ org.apache.oozie.service.JMSAccessorService,
+ org.apache.oozie.service.JMSTopicService,
+ org.apache.oozie.service.EventHandlerService
+ </value>
+ </property>
+ <property>
+ <name>oozie.service.EventHandlerService.event.listeners</name>
+ <value>
+ org.apache.oozie.jms.JMSJobEventListener
+ </value>
+ </property>
+ <property>
+ <name>oozie.jms.producer.connection.properties</name>
+ <value>
+ java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616
+ </value>
+ </property>
+ <property>
+ <name>oozie.service.JMSTopicService.topic.name</name>
+ <value>
+ WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC
+ </value>
+ <description>
+ Topic options are ${username} or a fixed string which can be specified as default or for a
+ particular job type.
+ For e.g To have a fixed string topic for workflows, coordinators and bundles,
+ specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
+ where job type can be WORKFLOW, COORDINATOR or BUNDLE.
+ Following example defines topic for workflow job, workflow action, coordinator job, coordinator action,
+ bundle job and bundle action
+ WORKFLOW=workflow,
+ COORDINATOR=coordinator,
+ BUNDLE=bundle
+ For jobs with no defined topic, default topic will be ${username}
+ </description>
+ </property>
+ <property>
+ <name>oozie.service.JMSTopicService.topic.prefix</name>
+ <value>FALCON.</value>
+ <description>
+ This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name. For eg: oozie.
+ </description>
+ </property>
+
<property>
<name>oozie.service.HadoopAccessorService.supported.filesystems</name>
<value>*</value>