You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/09/01 23:41:53 UTC

oozie git commit: OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs

Repository: oozie
Updated Branches:
  refs/heads/master 5a598039a -> 3f7a0c562


OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3f7a0c56
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3f7a0c56
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3f7a0c56

Branch: refs/heads/master
Commit: 3f7a0c562252e55d16cde15991a00293b1122efc
Parents: 5a59803
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Sep 1 14:43:12 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Sep 1 14:43:12 2015 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorActionBean.java |   4 +-
 .../coord/CoordPushDependencyCheckXCommand.java |  14 +++
 .../executor/jpa/CoordActionQueryExecutor.java  |  29 ++++-
 .../CoordActionsGetForRecoveryJPAExecutor.java  | 125 -------------------
 ...dActionsGetReadyGroupbyJobIDJPAExecutor.java |  72 -----------
 .../apache/oozie/service/RecoveryService.java   |  39 +++---
 .../oozie/service/TestRecoveryService.java      |  52 ++++++--
 release-log.txt                                 |   1 +
 8 files changed, 110 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 85b7ed4..c3d4bb4 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -143,7 +143,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
 
-        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'READY') AND a.lastModifiedTimestamp <= :lastModifiedTime and a.nominalTimestamp <= :currentTime and a.jobId in ( select w.id from CoordinatorJobBean w where w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR')"),
 
         @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
         // Select query used by rerun, requires almost all columns so select * is used
@@ -161,8 +161,6 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
 
-         @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime"),
-
          @NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml  from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT'  AND a.statusStr <> 'IGNORED')"),
 
          @NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml  from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT'  AND a.statusStr <> 'IGNORED')")

http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
index cc34627..fbb2c6c 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
@@ -100,6 +100,20 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
 
     @Override
     protected Void execute() throws CommandException {
+        // this action should only get processed if current time > nominal time;
+        // otherwise, requeue this action for delay execution;
+        Date nominalTime = coordAction.getNominalTime();
+        Date currentTime = new Date();
+        if (nominalTime.compareTo(currentTime) > 0) {
+            queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), Math.max((nominalTime.getTime() - currentTime
+                    .getTime()), getCoordPushCheckRequeueInterval()));
+            updateCoordAction(coordAction, false);
+            LOG.info("[" + actionId
+                    + "]::CoordPushDependency:: nominal Time is newer than current time, so requeue and wait. Current="
+                    + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
+            return null;
+        }
+
         String pushMissingDeps = coordAction.getPushMissingDependencies();
         if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
             LOG.info("Nothing to check. Empty push missing dependency");

http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index c6a60a1..6c7f4be 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -57,7 +57,9 @@ public class CoordActionQueryExecutor extends
         GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE,
         GET_TERMINATED_ACTIONS_FOR_DATES,
         GET_TERMINATED_ACTION_IDS_FOR_DATES,
-        GET_ACTIVE_ACTIONS_FOR_DATES
+        GET_ACTIVE_ACTIONS_FOR_DATES,
+        GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
+        GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN
     };
 
     private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -199,6 +201,13 @@ public class CoordActionQueryExecutor extends
                 query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime()));
                 query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime()));
                 break;
+            case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
+                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
+                break;
+            case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
+                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
+                query.setParameter("currentTime", new Timestamp(new Date().getTime()));
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
@@ -293,6 +302,24 @@ public class CoordActionQueryExecutor extends
                 bean.setNominalTime((Timestamp) arr[5]);
                 bean.setCreatedTime((Timestamp) arr[6]);
                 break;
+            case  GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
+                arr = (Object[]) ret;
+                bean = new CoordinatorActionBean();
+                bean.setId((String)arr[0]);
+                bean.setJobId((String)arr[1]);
+                bean.setStatusStr((String) arr[2]);
+                bean.setExternalId((String) arr[3]);
+                bean.setPending((Integer) arr[4]);
+                break;
+            case    GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
+                arr = (Object[]) ret;
+                bean = new CoordinatorActionBean();
+                bean.setId((String)arr[0]);
+                bean.setJobId((String)arr[1]);
+                bean.setStatusStr((String) arr[2]);
+                bean.setExternalId((String) arr[3]);
+                bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
+                break;
 
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "

http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
deleted file mode 100644
index bba9cc1..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.StringBlob;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.ParamChecker;
-
-public class CoordActionsGetForRecoveryJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> {
-
-    private long checkAgeSecs = 0;
-
-    public CoordActionsGetForRecoveryJPAExecutor(final long checkAgeSecs) {
-        ParamChecker.notNull(checkAgeSecs, "checkAgeSecs");
-        this.checkAgeSecs = checkAgeSecs;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
-     */
-    @Override
-    public String getName() {
-        return "CoordActionsGetForRecoveryJPAExecutor";
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException {
-        List<CoordinatorActionBean> allActions = new ArrayList<CoordinatorActionBean>();
-
-        try {
-            Query q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
-            Timestamp ts = new Timestamp(System.currentTimeMillis() - this.checkAgeSecs * 1000);
-            q.setParameter("lastModifiedTime", ts);
-            List<Object[]> objectArrList = q.getResultList();
-            for (Object[] arr : objectArrList) {
-                CoordinatorActionBean caa = getBeanForCoordinatorActionFromArrayForRecovery(arr);
-                allActions.add(caa);
-            }
-
-            q = em.createNamedQuery("GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN");
-            q.setParameter("lastModifiedTime", ts);
-            objectArrList = q.getResultList();
-            for (Object[] arr : objectArrList) {
-                CoordinatorActionBean caa = getBeanForCoordinatorActionFromArrayForWaiting(arr);
-                allActions.add(caa);
-            }
-
-            return allActions;
-        }
-        catch (IllegalStateException e) {
-            throw new JPAExecutorException(ErrorCode.E0601, e.getMessage(), e);
-        }
-    }
-
-    private CoordinatorActionBean getBeanForCoordinatorActionFromArrayForRecovery(Object[] arr) {
-        CoordinatorActionBean bean = new CoordinatorActionBean();
-        if (arr[0] != null) {
-            bean.setId((String) arr[0]);
-        }
-        if (arr[1] != null){
-            bean.setJobId((String) arr[1]);
-        }
-        if (arr[2] != null) {
-            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
-        }
-        if (arr[3] != null) {
-            bean.setExternalId((String) arr[3]);
-        }
-        if (arr[4] != null) {
-            bean.setPending((Integer) arr[4]);
-        }
-        return bean;
-    }
-
-
-    private CoordinatorActionBean getBeanForCoordinatorActionFromArrayForWaiting(Object[] arr){
-        CoordinatorActionBean bean = new CoordinatorActionBean();
-        if (arr[0] != null) {
-            bean.setId((String) arr[0]);
-        }
-        if (arr[1] != null){
-            bean.setJobId((String) arr[1]);
-        }
-        if (arr[2] != null) {
-            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
-        }
-        if (arr[3] != null) {
-            bean.setExternalId((String) arr[3]);
-        }
-        if (arr[4] != null) {
-            bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
-        }
-        return bean;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java
deleted file mode 100644
index 3a85e5c..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetReadyGroupbyJobIDJPAExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.ParamChecker;
-
-public class CoordActionsGetReadyGroupbyJobIDJPAExecutor implements JPAExecutor<List<String>>{
-    private long checkAgeSecs = 0;
-
-    public CoordActionsGetReadyGroupbyJobIDJPAExecutor(final long checkAgeSecs) {
-        ParamChecker.notNull(checkAgeSecs, "checkAgeSecs");
-        this.checkAgeSecs = checkAgeSecs;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
-     */
-    @Override
-    public String getName() {
-        return "CoordActionsGetReadyGroupbyJobIDJPAExecutor";
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
-     */
-    @Override
-    public List<String> execute(EntityManager em) throws JPAExecutorException {
-        List<String> jobids = new ArrayList<String>();
-        try {
-            Query q = em.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
-            Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
-            q.setParameter("lastModifiedTime", ts);
-            List<Object[]> list = q.getResultList();
-
-            for (Object[] arr : list) {
-                if (arr != null && arr[0] != null) {
-                    jobids.add((String) arr[0]);
-                }
-            }
-
-            return jobids;
-        }
-        catch (IllegalStateException e) {
-            throw new JPAExecutorException(ErrorCode.E0601, e.getMessage(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/main/java/org/apache/oozie/service/RecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/RecoveryService.java b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
index 4b4a3f2..49f47d0 100644
--- a/core/src/main/java/org/apache/oozie/service/RecoveryService.java
+++ b/core/src/main/java/org/apache/oozie/service/RecoveryService.java
@@ -20,9 +20,12 @@ package org.apache.oozie.service;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.BundleActionBean;
@@ -50,8 +53,8 @@ import org.apache.oozie.command.wf.SignalXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
@@ -145,7 +148,6 @@ public class RecoveryService implements Service {
             jpaService = Services.get().get(JPAService.class);
             runWFRecovery();
             runCoordActionRecovery();
-            runCoordActionRecoveryForReady();
             runBundleRecovery();
             log.debug("QUEUING [{0}] for potential recovery", msg.toString());
             boolean ret = false;
@@ -242,13 +244,20 @@ public class RecoveryService implements Service {
          * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
          */
         private void runCoordActionRecovery() {
+            Set<String> readyJobs = new HashSet<String>();
             XLog.Info.get().clear();
             XLog log = XLog.getLog(getClass());
             long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL);
             long pushMissingDepDelay = pushMissingDepInterval;
-            List<CoordinatorActionBean> cactions = null;
+            Timestamp ts = new Timestamp(System.currentTimeMillis() - this.coordOlderThan * 1000);
+
+            List<CoordinatorActionBean> cactions = new ArrayList<CoordinatorActionBean>();
             try {
-                cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
+                cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
+                        CoordActionQuery.GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, ts));
+                cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
+                        CoordActionQuery.GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, ts));
+
             }
             catch (JPAExecutorException ex) {
                 log.warn("Error reading coord actions from database", ex);
@@ -301,30 +310,30 @@ public class RecoveryService implements Service {
                                 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
                             }
                         }
+                        else if (caction.getStatus() == CoordinatorActionBean.Status.READY) {
+                            readyJobs.add(caction.getJobId());
+                        }
                     }
                 }
                 catch (Exception ex) {
                     log.error("Exception, {0}", ex.getMessage(), ex);
                 }
             }
-
-
+            runCoordActionRecoveryForReady(readyJobs);
         }
 
         /**
          * Recover coordinator actions that are staying in READY too long
          */
-        private void runCoordActionRecoveryForReady() {
+        private void runCoordActionRecoveryForReady(Set<String> jobIds) {
             XLog.Info.get().clear();
             XLog log = XLog.getLog(getClass());
-
+            List<String> coordJobIds = new ArrayList<String>(jobIds);
             try {
-                List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
-                jobids = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(jobids);
-                msg.append(", COORD_READY_JOBS : " + jobids.size());
-                for (String jobid : jobids) {
-                        queueCallable(new CoordActionReadyXCommand(jobid));
-
+                coordJobIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(coordJobIds);
+                msg.append(", COORD_READY_JOBS : " + coordJobIds.size());
+                for (String jobid : coordJobIds) {
+                    queueCallable(new CoordActionReadyXCommand(jobid));
                     log.info("Recover READY coord actions for jobid :" + jobid);
                 }
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index c6ecd76..13d8e8d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -52,6 +52,8 @@ import org.apache.oozie.dependency.HCatURIHandler;
 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
@@ -383,9 +385,27 @@ public class TestRecoveryService extends XDataTestCase {
         CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
                 CoordinatorJob.Status.RUNNING, false, true);
 
+        CoordinatorJobBean jobWithError = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                CoordinatorJob.Status.RUNNINGWITHERROR, false, true);
+
+        CoordinatorJobBean suspendedJob = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                CoordinatorJob.Status.SUSPENDED, false, true);
+
         CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
                 CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml");
 
+        CoordinatorActionBean actionReady = addRecordToCoordActionTableForWaiting(job.getId(), 2,
+                CoordinatorAction.Status.READY, "coord-action-for-action-input-check.xml");
+
+        CoordinatorActionBean suspendedAction = addRecordToCoordActionTableForWaiting(suspendedJob.getId(), 1,
+                CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml");
+
+        CoordinatorActionBean runningWithErrorAction = addRecordToCoordActionTableForWaiting(jobWithError.getId(), 1,
+                CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml");
+
+        CoordinatorActionBean submittedAction = addRecordToCoordActionTableForWaiting(suspendedJob.getId(), 2,
+                CoordinatorAction.Status.SUBMITTED, "coord-action-for-action-input-check.xml");
+
         createDir(new File(getTestCaseDir(), "/2009/29/"));
         createDir(new File(getTestCaseDir(), "/2009/22/"));
         createDir(new File(getTestCaseDir(), "/2009/15/"));
@@ -397,24 +417,36 @@ public class TestRecoveryService extends XDataTestCase {
         recoveryRunnable.run();
 
         final String actionId = action.getId();
-        final JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
 
         waitFor(10000, new Predicate() {
             public boolean evaluate() throws Exception {
-                CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(actionId);
-                CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+                CoordinatorActionBean newAction = CoordActionQueryExecutor.getInstance().get(
+                        CoordActionQuery.GET_COORD_ACTION, actionId);
                 return (newAction.getStatus() != CoordinatorAction.Status.WAITING);
             }
         });
 
-        CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(actionId);
-        action = jpaService.execute(coordGetCmd);
-        if (action.getStatus() == CoordinatorAction.Status.WAITING) {
-            fail("recovery waiting coord action failed, action is WAITING");
-        }
-    }
+        action = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionId);
+        // action status should change from waiting
+        assertFalse(action.getStatus().equals(CoordinatorAction.Status.WAITING));
+        // action status should change from waiting
+        assertFalse(CoordActionQueryExecutor.getInstance()
+                .get(CoordActionQuery.GET_COORD_ACTION, runningWithErrorAction.getId()).getStatus()
+                .equals(CoordinatorAction.Status.WAITING));
+        // action status should change from waiting
+        assertFalse(CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionReady.getId())
+                .getStatus().equals(CoordinatorAction.Status.READY));
+        assertTrue(CoordActionQueryExecutor.getInstance()
+
+                // action status should remain to waiting bcz job is suspended
+                .get(CoordActionQuery.GET_COORD_ACTION, suspendedAction.getId()).getStatus()
+                .equals(CoordinatorAction.Status.WAITING));
+        // action status should remain to submitted bcz job is suspended
+        assertEquals(
+                CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, submittedAction.getId())
+                        .getStatus(), (CoordinatorAction.Status.SUBMITTED));
 
+    }
 
     public void testCoordActionRecoveryServiceForWaitingRegisterPartition() throws Exception {
         services.destroy();

http://git-wip-us.apache.org/repos/asf/oozie/blob/3f7a0c56/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a7c180b..7174312 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs (puru)
 OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter)
 OOZIE-2322 Oozie Web UI doesn't work with Kerberos in Internet Explorer 10 or 11 and curl (rkanter)
 OOZIE-2343 Shell Action should take Oozie Action config and setup HADOOP_CONF_DIR (rkanter)