You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/02/04 21:14:56 UTC

oozie git commit: OOZIE-2127 Add created time to RecoveryService WF queries

Repository: oozie
Updated Branches:
  refs/heads/master 5ab4c2515 -> b5a4e06ba


OOZIE-2127 Add created time to RecoveryService WF queries


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b5a4e06b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b5a4e06b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b5a4e06b

Branch: refs/heads/master
Commit: b5a4e06ba0ad5e0ac63bd876223182cfbd2a219c
Parents: 5ab4c25
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Wed Feb 4 12:14:24 2015 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Wed Feb 4 12:14:24 2015 -0800

----------------------------------------------------------------------
 .../org/apache/oozie/WorkflowActionBean.java    |  2 +-
 .../jpa/WorkflowActionQueryExecutor.java        |  2 ++
 .../apache/oozie/service/RecoveryService.java   | 12 ++++++-
 core/src/main/resources/oozie-default.xml       |  8 +++++
 .../oozie/service/TestRecoveryService.java      | 33 ++++++++++++++------
 release-log.txt                                 |  1 +
 6 files changed, 47 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 06edf53..a6cf74a 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -104,7 +104,7 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
 
-    @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select a.id, a.wfId, a.statusStr, a.type, a.pendingAgeTimestamp from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.statusStr <> 'RUNNING'"),
+    @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select a.id, a.wfId, a.statusStr, a.type, a.pendingAgeTimestamp from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.statusStr <> 'RUNNING' AND a.createdTimeTS >= :createdTime"),
 
     @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select a.id from WorkflowActionBean a where a.pending = 1 AND a.statusStr = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 2c459e4..0e99ae2 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -209,7 +209,9 @@ public class WorkflowActionQueryExecutor extends
             case GET_PENDING_ACTIONS:
                 Long minimumPendingAgeSecs = (Long) parameters[0];
                 Timestamp pts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
+                Timestamp createdTimeInterval = new Timestamp((Long) parameters[1]);
                 query.setParameter("pendingAge", pts);
+                query.setParameter("createdTime", createdTimeInterval);
                 break;
             case GET_ACTIONS_FOR_WORKFLOW_RERUN:
                 query.setParameter("wfId", parameters[0]);

http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
index 9f31e88..4b4a3f2 100644
--- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java
+++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
@@ -96,6 +96,9 @@ public class RecoveryService implements Service {
      * Age of actions to queue, in seconds.
      */
     public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
+
+    public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval";
+
     /**
      * Age of coordinator jobs to recover, in seconds.
      */
@@ -111,6 +114,9 @@ public class RecoveryService implements Service {
     private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
     private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
 
+    public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000;
+
+
 
     /**
      * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
@@ -334,10 +340,14 @@ public class RecoveryService implements Service {
             XLog.Info.get().clear();
             XLog log = XLog.getLog(getClass());
             // queue command for action recovery
+
+            long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL)
+                    * ONE_DAY_MILLISCONDS;
+
             List<WorkflowActionBean> actions = null;
             try {
                 actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS,
-                        olderThan);
+                        olderThan, createdTimeInterval);
             }
             catch (JPAExecutorException ex) {
                 log.warn("Exception while reading pending actions from storage", ex);

http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index fcc73b8..17da11b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -333,6 +333,14 @@
     </property>
 
     <property>
+        <name>oozie.service.RecoveryService.wf.actions.created.time.interval</name>
+        <value>7</value>
+        <description>
+        Created time period of the actions which are eligible to be queued for recovery in days.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.service.RecoveryService.callable.batch.size</name>
         <value>10</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 62d14a0..9f8e65f 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -56,7 +56,9 @@ import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
 import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
 import org.apache.oozie.store.CoordinatorStore;
 import org.apache.oozie.store.StoreException;
@@ -210,14 +212,21 @@ public class TestRecoveryService extends XDataTestCase {
      */
     public void testWorkflowActionRecoveryUserRetry() throws Exception {
         final JPAService jpaService = Services.get().get(JPAService.class);
-        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
-        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.USER_RETRY);
+        WorkflowJobBean job1 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        WorkflowActionBean action1 = this.addRecordToWfActionTable(job1.getId(), "1", WorkflowAction.Status.USER_RETRY);
+
+        WorkflowJobBean job2 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        WorkflowActionBean action2 = createWorkflowActionSetPending(job2.getId(), WorkflowAction.Status.USER_RETRY);
+        //Default recovery created time is 7 days.
+        action2.setCreatedTime(new Date(new Date().getTime() - 8 * RecoveryService.ONE_DAY_MILLISCONDS));
+        WorkflowActionInsertJPAExecutor actionInsertCmd = new WorkflowActionInsertJPAExecutor(action2);
+        jpaService.execute(actionInsertCmd);
 
         Runnable recoveryRunnable = new RecoveryRunnable(0, 60, 60);
         recoveryRunnable.run();
         sleep(3000);
 
-        final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
+        final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
 
         waitFor(5000, new Predicate() {
             public boolean evaluate() throws Exception {
@@ -225,18 +234,23 @@ public class TestRecoveryService extends XDataTestCase {
                 return a.getExternalId() != null;
             }
         });
-        action = jpaService.execute(wfActionGetCmd);
-        assertNotNull(action.getExternalId());
-        assertEquals(WorkflowAction.Status.RUNNING, action.getStatus());
+        action1 = jpaService.execute(wfActionGetCmd);
+        assertNotNull(action1.getExternalId());
+        assertEquals(WorkflowAction.Status.RUNNING, action1.getStatus());
+
+        //Action 2 should not get recover as it's created time is older then 7 days
+        action2= WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, action2.getId());
+        assertNull(action2.getExternalId());
+        assertEquals(WorkflowAction.Status.USER_RETRY, action2.getStatus());
 
-        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
+        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
+        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
         String user = conf.get("user.name");
         String group = conf.get("group.name");
         JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
 
-        String launcherId = action.getExternalId();
+        String launcherId = action1.getExternalId();
 
         final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
 
@@ -854,6 +868,7 @@ public class TestRecoveryService extends XDataTestCase {
         action.setType("map-reduce");
         action.setTransition("transition");
         action.setStatus(status);
+        action.setCreatedTime(new Date());
         action.setStartTime(new Date());
         action.setEndTime(new Date());
         action.setLastCheckTime(new Date());

http://git-wip-us.apache.org/repos/asf/oozie/blob/b5a4e06b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 2db4bb3..2126234 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2127 Add created time to RecoveryService WF queries (puru)
 OOZIE-2123 Disable launcher uber mode if classloader options are set (ryota)
 OOZIE-2118 add createdtime option to workflow jobs query (ryota)
 OOZIE-2110 cancel delegation token of launcher jobs that stay till child jobs finish (ryota)