You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/07/15 08:56:32 UTC

falcon git commit: FALCON-2060 Retry does not happen if instance timedout

Repository: falcon
Updated Branches:
  refs/heads/master 609fc5bc1 -> 0da207404


FALCON-2060 Retry does not happen if instance timedout

These got broken when we moved from wf rerun to coord rerun.

Author: Pallavi Rao <pa...@inmobi.com>

Reviewers: @peeyushb, @sandeepSamudrala

Closes #228 from pallavi-rao/2060


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0da20740
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0da20740
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0da20740

Branch: refs/heads/master
Commit: 0da2074040b95c64b49ab6fb65e57644bb022e55
Parents: 609fc5b
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Fri Jul 15 14:26:19 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jul 15 14:26:19 2016 +0530

----------------------------------------------------------------------
 .../falcon/workflow/WorkflowExecutionContext.java       |  4 ++--
 .../workflow/WorkflowJobEndNotificationService.java     |  4 +++-
 .../falcon/workflow/engine/OozieWorkflowEngine.java     |  2 ++
 .../org/apache/falcon/rerun/handler/RetryConsumer.java  |  6 +++---
 .../org/apache/falcon/rerun/handler/RetryHandler.java   | 12 ++++++------
 5 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9b1e1f4..9b011b8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -304,11 +304,11 @@ public class WorkflowExecutionContext {
     }
 
     public int getWorkflowRunId() {
-        return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID));
+        return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID, "0"));
     }
 
     public String getWorkflowRunIdString() {
-        return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID)));
+        return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID, "0")));
     }
 
     public String getWorkflowUser() {

http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index fbd1e3f..6d1332e 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -155,6 +155,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
     private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
         Properties wfProps = contextMap.get(context.getWorkflowId());
         if (wfProps == null) {
+            wfProps = new Properties();
             Entity entity = null;
             try {
                 entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
@@ -166,11 +167,12 @@ public class WorkflowJobEndNotificationService implements FalconService {
                 return false;
             }
             for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+                wfProps.setProperty(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster);
                 try {
                     InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
                             .getJobDetails(cluster, context.getWorkflowId()).getInstances();
                     if (instances != null && instances.length > 0) {
-                        wfProps = getWFProps(instances[0].getWfParams());
+                        wfProps.putAll(getWFProps(instances[0].getWfParams()));
                         // Required by RetryService. But, is not part of conf.
                         wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
                                 Integer.toString(instances[0].getRunId()));

http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ecbe7ee..9a09f18 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1603,6 +1603,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             } else if (jobId.endsWith("-B")) {
                 BundleJob bundle = client.getBundleJobInfo(jobId);
                 return bundle.getStatus().name();
+            } else if (jobId.contains("-C@")) {
+                return client.getCoordActionInfo(jobId).getStatus().name();
             }
             throw new IllegalArgumentException("Unhandled jobs id: " + jobId);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 4c763c2..3cad362 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -59,9 +59,9 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                     (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
             // Use coord action id for rerun if available
-            String id = message.getParentId();
-            if (StringUtils.isBlank(id)) {
-                id = message.getWfId();
+            String id = message.getWfId();
+            if (!id.contains("-C@") && StringUtils.isNotBlank(message.getParentId())) {
+                id = message.getParentId();
             }
             handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index c691922..b8adeef 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -110,17 +110,17 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
 
     @Override
     public void onFailure(WorkflowExecutionContext context) throws FalconException {
-        // Re-run does not make sense when killed by user.
-        if (context.isWorkflowKilledManually()) {
-            LOG.debug("Workflow: {} Instance: {} Entity: {}, killed manually by user. Will not retry.",
-                    context.getWorkflowId(), context.getNominalTimeAsISO8601(), context.getEntityName());
-            return;
-        } else if (context.hasWorkflowTimedOut()) {
+        if (context.hasWorkflowTimedOut()) {
             Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
             Retry retry = getRetry(entity);
             if (!retry.isOnTimeout()) {
                 return;
             }
+        // Re-run does not make sense when killed by user.
+        } else if (context.isWorkflowKilledManually()) {
+            LOG.debug("Workflow: {} Instance: {} Entity: {}, killed manually by user. Will not retry.",
+                    context.getWorkflowId(), context.getNominalTimeAsISO8601(), context.getEntityName());
+            return;
         }
         handleRerun(context.getClusterName(), context.getEntityType(),
                 context.getEntityName(), context.getNominalTimeAsISO8601(),