You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2014/12/26 13:10:53 UTC
incubator-falcon git commit: FALCON-935. Feed and process late rerun
failed in falcon trunk. Contributed by Shaik Idris Ali
Repository: incubator-falcon
Updated Branches:
refs/heads/master 14686731f -> df66fb02b
FALCON-935. Feed and process late rerun failed in falcon trunk. Contributed by Shaik Idris Ali
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/df66fb02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/df66fb02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/df66fb02
Branch: refs/heads/master
Commit: df66fb02ba055fa37699bed12e11831bc41a61b8
Parents: 1468673
Author: Suhas V <su...@inmobi.com>
Authored: Fri Dec 26 17:40:38 2014 +0530
Committer: Suhas V <su...@inmobi.com>
Committed: Fri Dec 26 17:40:38 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../workflow/engine/AbstractWorkflowEngine.java | 2 +-
.../workflow/engine/OozieWorkflowEngine.java | 21 ++++++++------------
.../falcon/rerun/handler/LateRerunConsumer.java | 2 +-
.../falcon/rerun/handler/RetryConsumer.java | 3 +--
5 files changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 820a368..2316094 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
+ FALCON-935 Feed and process late rerun failed in falcon trunk
+ (Shaik Idris Ali via Suhas Vasu)
+
FALCON-914 Add option to search for Entities. (Ajay Yadav via Srikanth
Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index bbf5a30..f5b142b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -59,7 +59,7 @@ public abstract class AbstractWorkflowEngine {
public abstract String delete(Entity entity, String cluster) throws FalconException;
- public abstract void reRun(String cluster, String wfId, Properties props) throws FalconException;
+ public abstract void reRun(String cluster, String wfId, Properties props, boolean isForced) throws FalconException;
public abstract void dryRun(Entity entity, String clusterName) throws FalconException;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 62c1457..f1f96b1 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -69,7 +69,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
@@ -782,7 +781,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
status = Status.RUNNING.name();
} else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
//wf re-run
- reRun(cluster, jobInfo.getId(), props);
+ reRun(cluster, jobInfo.getId(), props, false);
status = Status.RUNNING.name();
}
break;
@@ -1292,22 +1291,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public void reRun(String cluster, String jobId, Properties props) throws FalconException {
+ public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
ProxyOozieClient client = OozieClientFactory.get(cluster);
try {
WorkflowJob jobInfo = client.getJobInfo(jobId);
Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
- if (props == null || props.isEmpty()) {
- jobprops.put(OozieClient.RERUN_FAIL_NODES, "true");
- } else {
- for (Entry<Object, Object> entry : props.entrySet()) {
- jobprops.put(entry.getKey(), entry.getValue());
- }
- if (!jobprops.contains(OozieClient.RERUN_FAIL_NODES)
- && !jobprops.contains(OozieClient.RERUN_SKIP_NODES)) {
- jobprops.put(OozieClient.RERUN_FAIL_NODES, "true");
- }
+ if (props != null) {
+ jobprops.putAll(props);
+ }
+ //if user has set any of these oozie rerun properties then force rerun flag is ignored
+ if (!jobprops.contains(OozieClient.RERUN_FAIL_NODES) && !jobprops.contains(OozieClient.RERUN_SKIP_NODES)) {
+ jobprops.put(OozieClient.RERUN_FAIL_NODES, !isForced);
}
jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
jobprops.remove(OozieClient.BUNDLE_APP_PATH);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 51ccc5f..8327abf 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -72,7 +72,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
LOG.info("Late changes detected in the following feeds: {}", detectLate);
- handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null);
+ handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, true);
LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
message.getWfId(), message.getClusterName());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/df66fb02/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 685afe2..61aa3e1 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -52,8 +52,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
+ " At time: {}",
(message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
- handler.getWfEngine().reRun(message.getClusterName(),
- message.getWfId(), null);
+ handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, false);
} catch (Exception e) {
int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
.getProperty("max.retry.failure.count", "1"));