You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/06 11:40:06 UTC
[1/2] falcon git commit: FALCON-1719 Retry does not update the state
of the instance in the database
Repository: falcon
Updated Branches:
refs/heads/master 5e80dcd0d -> ccb6df38b
FALCON-1719 Retry does not update the state of the instance in the database
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7cde36c4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7cde36c4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7cde36c4
Branch: refs/heads/master
Commit: 7cde36c4104bcb43b913cbd1c22288daa773f488
Parents: 5e80dcd
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Jan 6 15:13:59 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Jan 6 15:13:59 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../falcon/rerun/handler/AbstractRerunConsumer.java | 11 ++++++++---
.../falcon/rerun/handler/AbstractRerunHandler.java | 15 +++++++++++++--
.../falcon/rerun/handler/LateRerunConsumer.java | 9 +++++----
.../falcon/rerun/handler/LateRerunHandler.java | 5 ++---
.../apache/falcon/rerun/handler/RetryConsumer.java | 4 ++--
6 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 599efca..2ed1ab4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
OPTIMIZATIONS
BUG FIXES
+ FALCON-1719 Retry does not update the state of the instance in the database (Pavan Kolamuri via Pallavi Rao)
+
FALCON-1710 dependency API sets totalResults as 0 by default(Praveen Adlakha via Ajay Yadava)
FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled(Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 582cb15..f60b927 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -26,6 +26,7 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,9 +75,12 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
// Login the user to access WfEngine as this user
CurrentUser.authenticate(message.getWorkflowUser());
- String jobStatus = handler.getWfEngine().getWorkflowStatus(
+ AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
+ message.getEntityName());
+ String jobStatus = wfEngine.getWorkflowStatus(
message.getClusterName(), message.getWfId());
- handleRerun(message.getClusterName(), jobStatus, message);
+ handleRerun(message.getClusterName(), jobStatus, message,
+ message.getEntityType(), message.getEntityName());
} catch (Throwable e) {
LOG.error("Error in rerun consumer", e);
@@ -84,5 +88,6 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
}
}
- protected abstract void handleRerun(String clusterName, String jobStatus, T message);
+ protected abstract void handleRerun(String clusterName, String jobStatus, T message,
+ String entityType, String entityName);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 64c566e..bc1f7f2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -17,9 +17,11 @@
*/
package org.apache.falcon.rerun.handler;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Retry;
import org.apache.falcon.rerun.event.RerunEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
@@ -58,8 +60,17 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
String wfId, String workflowUser, long msgReceivedTime);
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
- public AbstractWorkflowEngine getWfEngine() {
- return wfEngine;
+ public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) {
+ if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) {
+ return wfEngine;
+ }
+ try {
+ Entity entity = EntityUtil.getEntity(EntityType.valueOf(entityType), entityName);
+ return WorkflowEngineFactory.getWorkflowEngine(entity);
+ } catch (FalconException e) {
+ // Just to make sure of backward compatibility in case of any exceptions.
+ return wfEngine;
+ }
}
public boolean offerToQueue(T event) throws FalconException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index ee31952..4297788 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -53,7 +53,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
@Override
protected void handleRerun(String clusterName, String jobStatus,
- LaterunEvent message) {
+ LaterunEvent message, String entityType, String entityName) {
try {
if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
|| jobStatus.equals("SUSPENDED")) {
@@ -77,7 +77,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
LOG.info("Late changes detected in the following feeds: {}", detectLate);
- handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, true);
+ handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, true);
LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
message.getWfId(), message.getClusterName());
} catch (Exception e) {
@@ -91,8 +91,9 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
public String detectLate(LaterunEvent message) throws Exception {
LateDataHandler late = new LateDataHandler();
- Properties properties = handler.getWfEngine().getWorkflowProperties(
- message.getClusterName(), message.getWfId());
+ AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
+ message.getEntityName());
+ Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId());
String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
String falconInputFeedStorageTypes =
http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 64177a4..1d2ed37 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -65,9 +65,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
Long wait = getEventDelay(entity, nominalTime);
if (wait == -1) {
LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
-
- java.util.Properties properties =
- this.getWfEngine().getWorkflowProperties(cluster, wfId);
+ AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName);
+ java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId);
String logDir = properties.getProperty("logDir");
String srcClusterName = properties.getProperty("srcClusterName");
Path lateLogPath = this.getLateLogPath(logDir,
http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 61aa3e1..96300d9 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -39,7 +39,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
@Override
protected void handleRerun(String clusterName, String jobStatus,
- RetryEvent message) {
+ RetryEvent message, String entityType, String entityName) {
try {
if (!jobStatus.equals("KILLED")) {
LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:"
@@ -52,7 +52,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
+ " At time: {}",
(message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
- handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, false);
+ handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, false);
} catch (Exception e) {
int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
.getProperty("max.retry.failure.count", "1"));
[2/2] falcon git commit: FALCON-1720 Rerun API does not rerun
succeeded instances (by Pavan Kolamuri)
Posted by pa...@apache.org.
FALCON-1720 Rerun API does not rerun succeeded instances (by Pavan Kolamuri)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccb6df38
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccb6df38
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccb6df38
Branch: refs/heads/master
Commit: ccb6df38bf086135d28bce9c0c92d6fd23e19459
Parents: 7cde36c
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Jan 6 15:35:31 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Jan 6 15:35:31 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../falcon/workflow/engine/OozieDAGEngine.java | 24 +++++++++++++++-----
.../AbstractSchedulerManagerJerseyIT.java | 2 +-
.../InstanceSchedulerManagerJerseyIT.java | 12 +++++++++-
.../local-process-noinputs-template.xml | 2 +-
5 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ed1ab4..5677175 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
OPTIMIZATIONS
BUG FIXES
+ FALCON-1720 Rerun API does not rerun succeeded instances (Pavan Kolamuri via Pallavi Rao)
+
FALCON-1719 Retry does not update the state of the instance in the database (Pavan Kolamuri via Pallavi Rao)
FALCON-1710 dependency API sets totalResults as 0 by default(Praveen Adlakha via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
index 1425a97..1d0e126 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -237,17 +237,29 @@ public class OozieDAGEngine implements DAGEngine {
String jobId = instance.getExternalID();
try {
WorkflowJob jobInfo = client.getJobInfo(jobId);
- Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
- if (props != null) {
- jobprops.putAll(props);
+ if (props == null) {
+ props = new Properties();
}
//if user has set any of these oozie rerun properties then force rerun flag is ignored
- if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
- && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
- jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+ if (!props.containsKey(OozieClient.RERUN_FAIL_NODES)
+ && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+ props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
}
+
+ Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
+ jobprops.putAll(props);
+
jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
jobprops.remove(OozieClient.BUNDLE_APP_PATH);
+ // In case if both props exists one should be removed otherwise it will fail.
+ // This case will occur when user runs workflow with skip-nodes property and
+ // try to do force rerun or rerun with fail-nodes property.
+ if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
+ && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+ LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES
+ + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES);
+ jobprops.remove(OozieClient.RERUN_SKIP_NODES);
+ }
client.reRun(jobId, jobprops);
assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED);
LOG.info("Rerun job {} of entity {} of time {} on cluster {}", jobId, instance.getEntity().getName(),
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
index f053b76..0a3e984 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
@@ -62,7 +62,7 @@ public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase {
private static final String DB_BASE_DIR = "target/test-data/falcondb";
protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
- protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+ protected static final String DB_SQL_FILE = dbLocation + File.separator + "out.sql";
protected LocalFileSystem localFS = new LocalFileSystem();
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
index 1523b76..18c36ff 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
@@ -64,7 +64,7 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe
}
@Test
- public void testKillInstances() throws Exception {
+ public void testKillAndRerunInstances() throws Exception {
UnitTestContext context = new UnitTestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -84,6 +84,16 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe
processName, START_INSTANCE);
Assert.assertEquals(status, InstancesResult.WorkflowStatus.KILLED);
+ result = falconUnitClient.rerunInstances(EntityType.PROCESS.toString(),
+ processName, START_INSTANCE, END_TIME, colo, null, null, null, null, true, null);
+ assertStatus(result);
+
+ waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
+ InstancesResult.WorkflowStatus.RUNNING);
+ status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+ processName, START_INSTANCE);
+ Assert.assertEquals(status, InstancesResult.WorkflowStatus.RUNNING);
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/webapp/src/test/resources/local-process-noinputs-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/local-process-noinputs-template.xml b/webapp/src/test/resources/local-process-noinputs-template.xml
index aabdc6a..a7388d8 100644
--- a/webapp/src/test/resources/local-process-noinputs-template.xml
+++ b/webapp/src/test/resources/local-process-noinputs-template.xml
@@ -38,5 +38,5 @@
<property name="sundayThisWeek" value="${currentWeek('SUN', 0, 0)}"/>
</properties>
<workflow engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/>
- <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+ <retry policy="periodic" delay="minutes(10)" attempts="1"/>
</process>