You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/09/01 23:41:53 UTC
oozie git commit: OOZIE-2348 Recovery service keeps on recovering
coord action of suspended jobs
Repository: oozie
Updated Branches:
refs/heads/master 5a598039a -> 3f7a0c562
OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3f7a0c56
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3f7a0c56
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3f7a0c56
Branch: refs/heads/master
Commit: 3f7a0c562252e55d16cde15991a00293b1122efc
Parents: 5a59803
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Sep 1 14:43:12 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Sep 1 14:43:12 2015 -0700
----------------------------------------------------------------------
.../org/apache/oozie/CoordinatorActionBean.java | 4 +-
.../coord/CoordPushDependencyCheckXCommand.java | 14 +++
.../executor/jpa/CoordActionQueryExecutor.java | 29 ++++-
.../CoordActionsGetForRecoveryJPAExecutor.java | 125 -------------------
...dActionsGetReadyGroupbyJobIDJPAExecutor.java | 72 -----------
.../apache/oozie/service/RecoveryService.java | 39 +++---
.../oozie/service/TestRecoveryService.java | 52 ++++++--
release-log.txt | 1 +
8 files changed, 110 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 85b7ed4..c3d4bb4 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -143,7 +143,7 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
- @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'READY') AND a.lastModifiedTimestamp <= :lastModifiedTime and a.nominalTimestamp <= :currentTime and a.jobId in ( select w.id from CoordinatorJobBean w where w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR')"),
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
// Select query used by rerun, requires almost all columns so select * is used
@@ -161,8 +161,6 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
- @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime"),
-
@NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')"),
@NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')")
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
index cc34627..fbb2c6c 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
@@ -100,6 +100,20 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
@Override
protected Void execute() throws CommandException {
+ // this action should only get processed if current time > nominal time;
+ // otherwise, requeue this action for delay execution;
+ Date nominalTime = coordAction.getNominalTime();
+ Date currentTime = new Date();
+ if (nominalTime.compareTo(currentTime) > 0) {
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), Math.max((nominalTime.getTime() - currentTime
+ .getTime()), getCoordPushCheckRequeueInterval()));
+ updateCoordAction(coordAction, false);
+ LOG.info("[" + actionId
+ + "]::CoordPushDependency:: nominal Time is newer than current time, so requeue and wait. Current="
+ + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
+ return null;
+ }
+
String pushMissingDeps = coordAction.getPushMissingDependencies();
if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
LOG.info("Nothing to check. Empty push missing dependency");
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index c6a60a1..6c7f4be 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -57,7 +57,9 @@ public class CoordActionQueryExecutor extends
GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE,
GET_TERMINATED_ACTIONS_FOR_DATES,
GET_TERMINATED_ACTION_IDS_FOR_DATES,
- GET_ACTIVE_ACTIONS_FOR_DATES
+ GET_ACTIVE_ACTIONS_FOR_DATES,
+ GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
+ GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN
};
private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -199,6 +201,13 @@ public class CoordActionQueryExecutor extends
query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime()));
query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime()));
break;
+ case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
+ query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
+ break;
+ case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
+ query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
+ query.setParameter("currentTime", new Timestamp(new Date().getTime()));
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
@@ -293,6 +302,24 @@ public class CoordActionQueryExecutor extends
bean.setNominalTime((Timestamp) arr[5]);
bean.setCreatedTime((Timestamp) arr[6]);
break;
+ case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
+ arr = (Object[]) ret;
+ bean = new CoordinatorActionBean();
+ bean.setId((String)arr[0]);
+ bean.setJobId((String)arr[1]);
+ bean.setStatusStr((String) arr[2]);
+ bean.setExternalId((String) arr[3]);
+ bean.setPending((Integer) arr[4]);
+ break;
+ case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
+ arr = (Object[]) ret;
+ bean = new CoordinatorActionBean();
+ bean.setId((String)arr[0]);
+ bean.setJobId((String)arr[1]);
+ bean.setStatusStr((String) arr[2]);
+ bean.setExternalId((String) arr[3]);
+ bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
deleted file mode 100644
index bba9cc1..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.StringBlob;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.ParamChecker;
-
-public class CoordActionsGetForRecoveryJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
-
- private long checkAgeSecs = 0;
-
- public CoordActionsGetForRecoveryJPAExecutor(final long checkAgeSecs) {
- ParamChecker.notNull(checkAgeSecs, "checkAgeSecs");
- this.checkAgeSecs = checkAgeSecs;
- }
-
- /* (non-Javadoc)
- * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
- */
- @Override
- public String getName() {
- return "CoordActionsGetForRecoveryJPAExecutor";
- }
-
- /* (non-Javadoc)
- * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
- */
- @SuppressWarnings("unchecked")
- @Override
- public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
- List<CoordinatorActionBean> allActions = new ArrayList<CoordinatorActionBean>();
-
- try {
- Query q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
- Timestamp ts = new Timestamp(System.currentTimeMillis() - this.checkAgeSecs * 1000);
- q.setParameter("lastModifiedTime", ts);
- List<Object[]> objectArrList = q.getResultList();
- for (Object[] arr : objectArrList) {
- CoordinatorActionBean caa = getBeanForCoordinatorActionFromArrayForRecovery(arr);
- allActions.add(caa);
- }
-
- q = em.createNamedQuery("GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN");
- q.setParameter("lastModifiedTime", ts);
- objectArrList = q.getResultList();
- for (Object[] arr : objectArrList) {
- CoordinatorActionBean caa = getBeanForCoordinatorActionFromArrayForWaiting(arr);
- allActions.add(caa);
- }
-
- return allActions;
- }
- catch (IllegalStateException e) {
- throw new JPAExecutorException(ErrorCode.E0601, e.getMessage(), e);
- }
- }
-
- private CoordinatorActionBean getBeanForCoordinatorActionFromArrayForRecovery(Object[] arr) {
- CoordinatorActionBean bean = new CoordinatorActionBean();
- if (arr[0] != null) {
- bean.setId((String) arr[0]);
- }
- if (arr[1] != null){
- bean.setJobId((String) arr[1]);
- }
- if (arr[2] != null) {
- bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
- }
- if (arr[3] != null) {
- bean.setExternalId((String) arr[3]);
- }
- if (arr[4] != null) {
- bean.setPending((Integer) arr[4]);
- }
- return bean;
- }
-
-
- private CoordinatorActionBean getBeanForCoordinatorActionFromArrayForWaiting(Object[] arr){
- CoordinatorActionBean bean = new CoordinatorActionBean();
- if (arr[0] != null) {
- bean.setId((String) arr[0]);
- }
- if (arr[1] != null){
- bean.setJobId((String) arr[1]);
- }
- if (arr[2] != null) {
- bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
- }
- if (arr[3] != null) {
- bean.setExternalId((String) arr[3]);
- }
- if (arr[4] != null) {
- bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
- }
- return bean;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java
deleted file mode 100644
index 3a85e5c..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.ParamChecker;
-
-public class CoordActionsGetReadyGroupbyJobIDJPAExecutor implements JPAExecutor<List<String>>{
- private long checkAgeSecs = 0;
-
- public CoordActionsGetReadyGroupbyJobIDJPAExecutor(final long checkAgeSecs) {
- ParamChecker.notNull(checkAgeSecs, "checkAgeSecs");
- this.checkAgeSecs = checkAgeSecs;
- }
-
- /* (non-Javadoc)
- * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
- */
- @Override
- public String getName() {
- return "CoordActionsGetReadyGroupbyJobIDJPAExecutor";
- }
-
- /* (non-Javadoc)
- * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
- */
- @Override
- public List<String> execute(EntityManager em) throws JPAExecutorException {
- List<String> jobids = new ArrayList<String>();
- try {
- Query q = em.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
- Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
- q.setParameter("lastModifiedTime", ts);
- List<Object[]> list = q.getResultList();
-
- for (Object[] arr : list) {
- if (arr != null && arr[0] != null) {
- jobids.add((String) arr[0]);
- }
- }
-
- return jobids;
- }
- catch (IllegalStateException e) {
- throw new JPAExecutorException(ErrorCode.E0601, e.getMessage(), e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/service/RecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
index 4b4a3f2..49f47d0 100644
--- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java
+++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
@@ -20,9 +20,12 @@ package org.apache.oozie.service;
import java.io.IOException;
import java.io.StringReader;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BundleActionBean;
@@ -50,8 +53,8 @@ import org.apache.oozie.command.wf.SignalXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
@@ -145,7 +148,6 @@ public class RecoveryService implements Service {
jpaService = Services.get().get(JPAService.class);
runWFRecovery();
runCoordActionRecovery();
- runCoordActionRecoveryForReady();
runBundleRecovery();
log.debug("QUEUING [{0}] for potential recovery", msg.toString());
boolean ret = false;
@@ -242,13 +244,20 @@ public class RecoveryService implements Service {
* Recover coordinator actions that are staying in WAITING or SUBMITTED too long
*/
private void runCoordActionRecovery() {
+ Set<String> readyJobs = new HashSet<String>();
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL);
long pushMissingDepDelay = pushMissingDepInterval;
- List<CoordinatorActionBean> cactions = null;
+ Timestamp ts = new Timestamp(System.currentTimeMillis() - this.coordOlderThan * 1000);
+
+ List<CoordinatorActionBean> cactions = new ArrayList<CoordinatorActionBean>();
try {
- cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
+ cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, ts));
+ cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, ts));
+
}
catch (JPAExecutorException ex) {
log.warn("Error reading coord actions from database", ex);
@@ -301,30 +310,30 @@ public class RecoveryService implements Service {
log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
}
}
+ else if (caction.getStatus() == CoordinatorActionBean.Status.READY) {
+ readyJobs.add(caction.getJobId());
+ }
}
}
catch (Exception ex) {
log.error("Exception, {0}", ex.getMessage(), ex);
}
}
-
-
+ runCoordActionRecoveryForReady(readyJobs);
}
/**
* Recover coordinator actions that are staying in READY too long
*/
- private void runCoordActionRecoveryForReady() {
+ private void runCoordActionRecoveryForReady(Set<String> jobIds) {
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
-
+ List<String> coordJobIds = new ArrayList<String>(jobIds);
try {
- List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
- jobids = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(jobids);
- msg.append(", COORD_READY_JOBS : " + jobids.size());
- for (String jobid : jobids) {
- queueCallable(new CoordActionReadyXCommand(jobid));
-
+ coordJobIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(coordJobIds);
+ msg.append(", COORD_READY_JOBS : " + coordJobIds.size());
+ for (String jobid : coordJobIds) {
+ queueCallable(new CoordActionReadyXCommand(jobid));
log.info("Recover READY coord actions for jobid :" + jobid);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index c6ecd76..13d8e8d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -52,6 +52,8 @@ import org.apache.oozie.dependency.HCatURIHandler;
import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
@@ -383,9 +385,27 @@ public class TestRecoveryService extends XDataTestCase {
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
+ CoordinatorJobBean jobWithError = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNINGWITHERROR, false, true);
+
+ CoordinatorJobBean suspendedJob = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.SUSPENDED, false, true);
+
CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml");
+ CoordinatorActionBean actionReady = addRecordToCoordActionTableForWaiting(job.getId(), 2,
+ CoordinatorAction.Status.READY, "coord-action-for-action-input-check.xml");
+
+ CoordinatorActionBean suspendedAction = addRecordToCoordActionTableForWaiting(suspendedJob.getId(), 1,
+ CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml");
+
+ CoordinatorActionBean runningWithErrorAction = addRecordToCoordActionTableForWaiting(jobWithError.getId(), 1,
+ CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml");
+
+ CoordinatorActionBean submittedAction = addRecordToCoordActionTableForWaiting(suspendedJob.getId(), 2,
+ CoordinatorAction.Status.SUBMITTED, "coord-action-for-action-input-check.xml");
+
createDir(new File(getTestCaseDir(), "/2009/29/"));
createDir(new File(getTestCaseDir(), "/2009/22/"));
createDir(new File(getTestCaseDir(), "/2009/15/"));
@@ -397,24 +417,36 @@ public class TestRecoveryService extends XDataTestCase {
recoveryRunnable.run();
final String actionId = action.getId();
- final JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
waitFor(10000, new Predicate() {
public boolean evaluate() throws Exception {
- CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(actionId);
- CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+ CoordinatorActionBean newAction = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, actionId);
return (newAction.getStatus() != CoordinatorAction.Status.WAITING);
}
});
- CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(actionId);
- action = jpaService.execute(coordGetCmd);
- if (action.getStatus() == CoordinatorAction.Status.WAITING) {
- fail("recovery waiting coord action failed, action is WAITING");
- }
- }
+ action = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionId);
+ // action status should change from waiting
+ assertFalse(action.getStatus().equals(CoordinatorAction.Status.WAITING));
+ // action status should change from waiting
+ assertFalse(CoordActionQueryExecutor.getInstance()
+ .get(CoordActionQuery.GET_COORD_ACTION, runningWithErrorAction.getId()).getStatus()
+ .equals(CoordinatorAction.Status.WAITING));
+ // action status should change from waiting
+ assertFalse(CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionReady.getId())
+ .getStatus().equals(CoordinatorAction.Status.READY));
+ assertTrue(CoordActionQueryExecutor.getInstance()
+
+ // action status should remain to waiting bcz job is suspended
+ .get(CoordActionQuery.GET_COORD_ACTION, suspendedAction.getId()).getStatus()
+ .equals(CoordinatorAction.Status.WAITING));
+ // action status should remain to submitted bcz job is suspended
+ assertEquals(
+ CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, submittedAction.getId())
+ .getStatus(), (CoordinatorAction.Status.SUBMITTED));
+ }
public void testCoordActionRecoveryServiceForWaitingRegisterPartition() throws Exception {
services.destroy();
http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a7c180b..7174312 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs (puru)
OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter)
OOZIE-2322 Oozie Web UI doesn't work with Kerberos in Internet Explorer 10 or 11 and curl (rkanter)
OOZIE-2343 Shell Action should take Oozie Action config and setup HADOOP_CONF_DIR (rkanter)