You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2014/12/26 13:10:53 UTC

incubator-falcon git commit: FALCON-935. Feed and process late rerun failed in falcon trunk. Contributed by Shaik Idris Ali

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 14686731f -> df66fb02b


FALCON-935. Feed and process late rerun failed in falcon trunk. Contributed by Shaik Idris Ali


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/df66fb02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/df66fb02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/df66fb02

Branch: refs/heads/master
Commit: df66fb02ba055fa37699bed12e11831bc41a61b8
Parents: 1468673
Author: Suhas V <su...@inmobi.com>
Authored: Fri Dec 26 17:40:38 2014 +0530
Committer: Suhas V <su...@inmobi.com>
Committed: Fri Dec 26 17:40:38 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../workflow/engine/AbstractWorkflowEngine.java |  2 +-
 .../workflow/engine/OozieWorkflowEngine.java    | 21 ++++++++------------
 .../falcon/rerun/handler/LateRerunConsumer.java |  2 +-
 .../falcon/rerun/handler/RetryConsumer.java     |  3 +--
 5 files changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 820a368..2316094 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
+   FALCON-935 Feed and process late rerun failed in falcon trunk
+   (Shaik Idris Ali via Suhas Vasu)
+
    FALCON-914 Add option to search for Entities. (Ajay Yadav via Srikanth
    Sundarrajan) 
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index bbf5a30..f5b142b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -59,7 +59,7 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract String delete(Entity entity, String cluster) throws FalconException;
 
-    public abstract void reRun(String cluster, String wfId, Properties props) throws FalconException;
+    public abstract void reRun(String cluster, String wfId, Properties props, boolean isForced) throws FalconException;
 
     public abstract void dryRun(Entity entity, String clusterName) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 62c1457..f1f96b1 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -69,7 +69,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TimeZone;
@@ -782,7 +781,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 status = Status.RUNNING.name();
             } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
                 //wf re-run
-                reRun(cluster, jobInfo.getId(), props);
+                reRun(cluster, jobInfo.getId(), props, false);
                 status = Status.RUNNING.name();
             }
             break;
@@ -1292,22 +1291,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void reRun(String cluster, String jobId, Properties props) throws FalconException {
+    public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
 
         ProxyOozieClient client = OozieClientFactory.get(cluster);
         try {
             WorkflowJob jobInfo = client.getJobInfo(jobId);
             Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
-            if (props == null || props.isEmpty()) {
-                jobprops.put(OozieClient.RERUN_FAIL_NODES, "true");
-            } else {
-                for (Entry<Object, Object> entry : props.entrySet()) {
-                    jobprops.put(entry.getKey(), entry.getValue());
-                }
-                if (!jobprops.contains(OozieClient.RERUN_FAIL_NODES)
-                        && !jobprops.contains(OozieClient.RERUN_SKIP_NODES)) {
-                    jobprops.put(OozieClient.RERUN_FAIL_NODES, "true");
-                }
+            if (props != null) {
+                jobprops.putAll(props);
+            }
+            //if user has set any of these oozie rerun properties then force rerun flag is ignored
+            if (!jobprops.contains(OozieClient.RERUN_FAIL_NODES) && !jobprops.contains(OozieClient.RERUN_SKIP_NODES)) {
+                jobprops.put(OozieClient.RERUN_FAIL_NODES, !isForced);
             }
             jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
             jobprops.remove(OozieClient.BUNDLE_APP_PATH);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 51ccc5f..8327abf 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -72,7 +72,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
             LOG.info("Late changes detected in the following feeds: {}", detectLate);
 
-            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null);
+            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, true);
             LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
                     message.getWfId(), message.getClusterName());
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 685afe2..61aa3e1 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -52,8 +52,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                     + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
-            handler.getWfEngine().reRun(message.getClusterName(),
-                    message.getWfId(), null);
+            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, false);
         } catch (Exception e) {
             int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
                     .getProperty("max.retry.failure.count", "1"));