You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/07/15 08:56:32 UTC
falcon git commit: FALCON-2060 Retry does not happen if instance
timedout
Repository: falcon
Updated Branches:
refs/heads/master 609fc5bc1 -> 0da207404
FALCON-2060 Retry does not happen if instance timedout
These got broken when we moved from wf rerun to coord rerun.
Author: Pallavi Rao <pa...@inmobi.com>
Reviewers: @peeyushb, @sandeepSamudrala
Closes #228 from pallavi-rao/2060
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0da20740
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0da20740
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0da20740
Branch: refs/heads/master
Commit: 0da2074040b95c64b49ab6fb65e57644bb022e55
Parents: 609fc5b
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Fri Jul 15 14:26:19 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jul 15 14:26:19 2016 +0530
----------------------------------------------------------------------
.../falcon/workflow/WorkflowExecutionContext.java | 4 ++--
.../workflow/WorkflowJobEndNotificationService.java | 4 +++-
.../falcon/workflow/engine/OozieWorkflowEngine.java | 2 ++
.../org/apache/falcon/rerun/handler/RetryConsumer.java | 6 +++---
.../org/apache/falcon/rerun/handler/RetryHandler.java | 12 ++++++------
5 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9b1e1f4..9b011b8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -304,11 +304,11 @@ public class WorkflowExecutionContext {
}
public int getWorkflowRunId() {
- return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID));
+ return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID, "0"));
}
public String getWorkflowRunIdString() {
- return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID)));
+ return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID, "0")));
}
public String getWorkflowUser() {
http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index fbd1e3f..6d1332e 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -155,6 +155,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
Properties wfProps = contextMap.get(context.getWorkflowId());
if (wfProps == null) {
+ wfProps = new Properties();
Entity entity = null;
try {
entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
@@ -166,11 +167,12 @@ public class WorkflowJobEndNotificationService implements FalconService {
return false;
}
for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ wfProps.setProperty(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster);
try {
InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
.getJobDetails(cluster, context.getWorkflowId()).getInstances();
if (instances != null && instances.length > 0) {
- wfProps = getWFProps(instances[0].getWfParams());
+ wfProps.putAll(getWFProps(instances[0].getWfParams()));
// Required by RetryService. But, is not part of conf.
wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
Integer.toString(instances[0].getRunId()));
http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ecbe7ee..9a09f18 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1603,6 +1603,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
} else if (jobId.endsWith("-B")) {
BundleJob bundle = client.getBundleJobInfo(jobId);
return bundle.getStatus().name();
+ } else if (jobId.contains("-C@")) {
+ return client.getCoordActionInfo(jobId).getStatus().name();
}
throw new IllegalArgumentException("Unhandled jobs id: " + jobId);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 4c763c2..3cad362 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -59,9 +59,9 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
(message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
// Use coord action id for rerun if available
- String id = message.getParentId();
- if (StringUtils.isBlank(id)) {
- id = message.getWfId();
+ String id = message.getWfId();
+ if (!id.contains("-C@") && StringUtils.isNotBlank(message.getParentId())) {
+ id = message.getParentId();
}
handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index c691922..b8adeef 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -110,17 +110,17 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
@Override
public void onFailure(WorkflowExecutionContext context) throws FalconException {
- // Re-run does not make sense when killed by user.
- if (context.isWorkflowKilledManually()) {
- LOG.debug("Workflow: {} Instance: {} Entity: {}, killed manually by user. Will not retry.",
- context.getWorkflowId(), context.getNominalTimeAsISO8601(), context.getEntityName());
- return;
- } else if (context.hasWorkflowTimedOut()) {
+ if (context.hasWorkflowTimedOut()) {
Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
Retry retry = getRetry(entity);
if (!retry.isOnTimeout()) {
return;
}
+ // Re-run does not make sense when killed by user.
+ } else if (context.isWorkflowKilledManually()) {
+ LOG.debug("Workflow: {} Instance: {} Entity: {}, killed manually by user. Will not retry.",
+ context.getWorkflowId(), context.getNominalTimeAsISO8601(), context.getEntityName());
+ return;
}
handleRerun(context.getClusterName(), context.getEntityType(),
context.getEntityName(), context.getNominalTimeAsISO8601(),