You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/02/04 21:14:56 UTC
oozie git commit: OOZIE-2127 Add created time to RecoveryService WF
queries
Repository: oozie
Updated Branches:
refs/heads/master 5ab4c2515 -> b5a4e06ba
OOZIE-2127 Add created time to RecoveryService WF queries
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b5a4e06b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b5a4e06b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b5a4e06b
Branch: refs/heads/master
Commit: b5a4e06ba0ad5e0ac63bd876223182cfbd2a219c
Parents: 5ab4c25
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Wed Feb 4 12:14:24 2015 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Wed Feb 4 12:14:24 2015 -0800
----------------------------------------------------------------------
.../org/apache/oozie/WorkflowActionBean.java | 2 +-
.../jpa/WorkflowActionQueryExecutor.java | 2 ++
.../apache/oozie/service/RecoveryService.java | 12 ++++++-
core/src/main/resources/oozie-default.xml | 8 +++++
.../oozie/service/TestRecoveryService.java | 33 ++++++++++++++------
release-log.txt | 1 +
6 files changed, 47 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 06edf53..a6cf74a 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -104,7 +104,7 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
- @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select a.id, a.wfId, a.statusStr, a.type, a.pendingAgeTimestamp from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.statusStr <> 'RUNNING'"),
+ @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select a.id, a.wfId, a.statusStr, a.type, a.pendingAgeTimestamp from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.statusStr <> 'RUNNING' AND a.createdTimeTS >= :createdTime"),
@NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select a.id from WorkflowActionBean a where a.pending = 1 AND a.statusStr = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 2c459e4..0e99ae2 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -209,7 +209,9 @@ public class WorkflowActionQueryExecutor extends
case GET_PENDING_ACTIONS:
Long minimumPendingAgeSecs = (Long) parameters[0];
Timestamp pts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
+ Timestamp createdTimeInterval = new Timestamp((Long) parameters[1]);
query.setParameter("pendingAge", pts);
+ query.setParameter("createdTime", createdTimeInterval);
break;
case GET_ACTIONS_FOR_WORKFLOW_RERUN:
query.setParameter("wfId", parameters[0]);
http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
index 9f31e88..4b4a3f2 100644
--- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java
+++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
@@ -96,6 +96,9 @@ public class RecoveryService implements Service {
* Age of actions to queue, in seconds.
*/
public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
+
+ public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval";
+
/**
* Age of coordinator jobs to recover, in seconds.
*/
@@ -111,6 +114,9 @@ public class RecoveryService implements Service {
private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
+ public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000;
+
+
/**
* RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
@@ -334,10 +340,14 @@ public class RecoveryService implements Service {
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
// queue command for action recovery
+
+ long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL)
+ * ONE_DAY_MILLISCONDS;
+
List<WorkflowActionBean> actions = null;
try {
actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS,
- olderThan);
+ olderThan, createdTimeInterval);
}
catch (JPAExecutorException ex) {
log.warn("Exception while reading pending actions from storage", ex);
http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index fcc73b8..17da11b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -333,6 +333,14 @@
</property>
<property>
+ <name>oozie.service.RecoveryService.wf.actions.created.time.interval</name>
+ <value>7</value>
+ <description>
+ Created time period of the actions which are eligible to be queued for recovery in days.
+ </description>
+ </property>
+
+ <property>
<name>oozie.service.RecoveryService.callable.batch.size</name>
<value>10</value>
<description>
http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 62d14a0..9f8e65f 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -56,7 +56,9 @@ import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
@@ -210,14 +212,21 @@ public class TestRecoveryService extends XDataTestCase {
*/
public void testWorkflowActionRecoveryUserRetry() throws Exception {
final JPAService jpaService = Services.get().get(JPAService.class);
- WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
- WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.USER_RETRY);
+ WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.USER_RETRY);
+
+ WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ WorkflowActionBean action2 = createWorkflowActionSetPending(job2.getId(), WorkflowAction.Status.USER_RETRY);
+ //Default recovery created time is 7 days.
+ action2.setCreatedTime(new Date(new Date().getTime() - 8 * RecoveryService.ONE_DAY_MILLISCONDS));
+ WorkflowActionInsertJPAExecutor actionInsertCmd = new WorkflowActionInsertJPAExecutor(action2);
+ jpaService.execute(actionInsertCmd);
Runnable recoveryRunnable = new RecoveryRunnable(0, 60, 60);
recoveryRunnable.run();
sleep(3000);
- final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
+ final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
waitFor(5000, new Predicate() {
public boolean evaluate() throws Exception {
@@ -225,18 +234,23 @@ public class TestRecoveryService extends XDataTestCase {
return a.getExternalId() != null;
}
});
- action = jpaService.execute(wfActionGetCmd);
- assertNotNull(action.getExternalId());
- assertEquals(WorkflowAction.Status.RUNNING, action.getStatus());
+ action1 = jpaService.execute(wfActionGetCmd);
+ assertNotNull(action1.getExternalId());
+ assertEquals(WorkflowAction.Status.RUNNING, action1.getStatus());
+
+ //Action 2 should not get recover as it's created time is older then 7 days
+ action2= WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, action2.getId());
+ assertNull(action2.getExternalId());
+ assertEquals(WorkflowAction.Status.USER_RETRY, action2.getStatus());
- ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
+ JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
String user = conf.get("user.name");
String group = conf.get("group.name");
JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
- String launcherId = action.getExternalId();
+ String launcherId = action1.getExternalId();
final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
@@ -854,6 +868,7 @@ public class TestRecoveryService extends XDataTestCase {
action.setType("map-reduce");
action.setTransition("transition");
action.setStatus(status);
+ action.setCreatedTime(new Date());
action.setStartTime(new Date());
action.setEndTime(new Date());
action.setLastCheckTime(new Date());
http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 2db4bb3..2126234 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.2.0 release (trunk - unreleased)
+OOZIE-2127 Add created time to RecoveryService WF queries (puru)
OOZIE-2123 Disable launcher uber mode if classloader options are set (ryota)
OOZIE-2118 add createdtime option to workflow jobs query (ryota)
OOZIE-2110 cancel delegation token of launcher jobs that stay till child jobs finish (ryota)