You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/03 09:38:01 UTC

svn commit: r1368811 [2/3] - in /incubator/oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src...

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java Fri Aug  3 07:38:00 2012
@@ -18,11 +18,14 @@
 package org.apache.oozie.service;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.Comparator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.BundleActionBean;
@@ -37,12 +40,12 @@ import org.apache.oozie.command.bundle.B
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusByPendingFalseJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
@@ -64,6 +67,7 @@ public class StatusTransitService implem
     public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
     public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
     public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
+    public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
     private static int limit = -1;
     private static Date lastInstanceStartTime = null;
     private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
@@ -120,6 +124,21 @@ public class StatusTransitService implem
             }
         }
 
+        public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
+            Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
+                @Override
+                public int compare(BundleJobBean b1, BundleJobBean b2) {
+                    if (b1.getId().equals(b2.getId())) {
+                        return 0;
+                    }
+                    else
+                        return 1;
+                }
+            });
+            s.addAll(pendingJobList);
+            return new ArrayList<BundleJobBean>(s);
+        }
+
         /**
          * Aggregate bundle actions' status to bundle jobs
          *
@@ -128,15 +147,11 @@ public class StatusTransitService implem
          */
         private void bundleTransit() throws JPAExecutorException, CommandException {
             List<BundleJobBean> pendingJobCheckList = null;
-            List<BundleJobBean> runningJobCheckList = null;
-            List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>();
+
             if (lastInstanceStartTime == null) {
                 LOG.info("Running bundle status service first instance");
-                // this is the first instance, we need to check for all pending jobs;
-                pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit));
-                runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit));
-                bundleLists.add(pendingJobCheckList);
-                bundleLists.add(runningJobCheckList);
+                // this is the first instance, we need to check for all pending or running jobs;
+                pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
             }
             else {
                 LOG.info("Running bundle status service from last instance time =  "
@@ -153,31 +168,30 @@ public class StatusTransitService implem
                 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
                     BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
                     // Running bundle job might have pending false
-                    if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) {
+                    if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
+                            || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
+                            || bundle.getStatus().equals(Job.Status.PAUSED)
+                            || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
                         pendingJobCheckList.add(bundle);
                     }
                 }
-                runningJobCheckList = pendingJobCheckList;
-                bundleLists.add(pendingJobCheckList);
             }
-            aggregateBundleJobsStatus(bundleLists);
+            aggregateBundleJobsStatus(pendingJobCheckList);
         }
 
-        private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException,
+        private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
                 CommandException {
             if (bundleLists != null) {
-                for (List<BundleJobBean> listBundleBean : bundleLists) {
-                    for (BundleJobBean bundleJob : listBundleBean) {
+                    for (BundleJobBean bundleJob : bundleLists) {
                         try {
                             String jobId = bundleJob.getId();
                             Job.Status[] bundleStatus = new Job.Status[1];
                             bundleStatus[0] = bundleJob.getStatus();
-                            List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(
+                            List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
                                     jobId));
                             HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
                             boolean foundPending = false;
                             for (BundleActionBean bAction : bundleActions) {
-                                if (!bAction.isPending()) {
                                     int counter = 0;
                                     if (bundleActionStatus.containsKey(bAction.getStatus())) {
                                         counter = bundleActionStatus.get(bAction.getStatus()) + 1;
@@ -192,41 +206,36 @@ public class StatusTransitService implem
                                         LOG.info("Bundle job ["+ jobId
                                                         + "] has been killed since one of its coordinator job failed submission.");
                                     }
-                                }
-                                else {
+
+                                 if (bAction.isPending()) {
                                     foundPending = true;
-                                    break;
                                 }
                             }
 
-                            if (foundPending) {
-                                continue;
-                            }
-
-                            if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
+                            if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
                                 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
                                         + "' from '" + bundleJob.getStatus() + "'");
-                                updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
                             }
-                            else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
+                            else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
                                 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
                                         + "' from '" + bundleJob.getStatus() + "'");
-                                updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
                             }
                             else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
                                 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
                                         + "' from '" + bundleJob.getStatus() + "'");
-                                updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
                             }
-                            else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) {
+                            else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
                                 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
                                         + "' from '" + bundleJob.getStatus() + "'");
-                                updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
                             }
                             else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
                                 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
                                         + "' from '" + bundleJob.getStatus() + "'");
-                                updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
                             }
                         }
                         catch (Exception ex) {
@@ -235,7 +244,7 @@ public class StatusTransitService implem
                         }
                     }
                 }
-            }
+
         }
 
         private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
@@ -255,14 +264,14 @@ public class StatusTransitService implem
                         Job.Status[] coordStatus = new Job.Status[1];
                         coordStatus[0] = coordJob.getStatus();
                         //Get count of Coordinator actions with pending true
+                        boolean isPending = false;
                         int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
                         if (count > 0) {
-                            continue;
+                             isPending = true;
                         }
-                        // Code below this is executed only when none of the Coordinator actions are pending
-                        // Get status of Coordinator actions with pending false
+                        // Get status of Coordinator actions
                         List<CoordinatorAction.Status> coordActionStatusList = jpaService
-                                .execute(new CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId));
+                                .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
                         HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
 
                         for (CoordinatorAction.Status status : coordActionStatusList) {
@@ -277,26 +286,30 @@ public class StatusTransitService implem
                         }
 
                         int nonPendingCoordActionsCount = coordActionStatusList.size();
+
                         if ((coordJob.isDoneMaterialization() || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
                                 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
                             LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
                                     + "' from '" + coordJob.getStatus() + "'");
-                            updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
+                            updateCoordJob(isPending, coordJob, coordStatus[0]);
                         }
-                        else if (coordJob.isDoneMaterialization()
-                                && checkCoordSuspendStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
+                        else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
                             LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
                                     + "' from '" + coordJob.getStatus() + "'");
-                            updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
+                            updateCoordJob(isPending, coordJob, coordStatus[0]);
+                        }
+                        else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
+                            LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
+                                    + "' from '" + coordJob.getStatus() + "'");
+                            updateCoordJob(isPending, coordJob, coordStatus[0]);
                         }
                         else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
                             LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
                                     + "' from '" + coordJob.getStatus() + "'");
-                            updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
+                            updateCoordJob(isPending, coordJob, coordStatus[0]);
                         }
-                        // checking pending flag for job when user killed or suspended the job
                         else {
-                            checkCoordPending(coordActionStatus, nonPendingCoordActionsCount, coordJob, true);
+                            checkCoordPending(isPending, coordJob, true);
                         }
                     }
                     catch (Exception ex) {
@@ -412,53 +425,165 @@ public class StatusTransitService implem
         }
 
         private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
-                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
+                List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
             boolean ret = false;
-            if (bundleActionStatus.containsKey(Job.Status.PAUSED)) {
-                if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) {
-                    bundleStatus[0] = Job.Status.PAUSED;
-                    ret = true;
+
+            // TODO - When bottom up cmds are allowed to change the status of parent job,
+            // if none of the bundle actions are in paused or pausedwitherror, the function should return
+            // false
+
+            // top down
+            // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
+            // state, then job should be PAUSED otherwise it should be pausedwitherror
+            if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
+                if (bundleActionStatus.containsKey(Job.Status.KILLED)
+                        || bundleActionStatus.containsKey(Job.Status.FAILED)
+                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
+                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
+                        || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
+                        || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
+                }
+                else {
+                    bundleJobStatus[0] = Job.Status.PAUSED;
                 }
-                else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
-                        && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)
-                                + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) {
-                    // bundleStatus = Job.Status.PAUSEDWITHERROR;
-                    // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator
-                    bundleStatus[0] = Job.Status.PAUSED;
+                ret = true;
+            }
+
+            // bottom up; check the status of parent through their children
+            else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
+                    && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
+                bundleJobStatus[0] = Job.Status.PAUSED;
+                ret = true;
+            }
+            else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+                int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
+                        .get(Job.Status.PAUSED) : 0;
+                if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
+                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
                     ret = true;
                 }
             }
+            else {
+                ret = false;
+            }
             return ret;
         }
 
+
         private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
-                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
+                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
             boolean ret = false;
-            if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) {
-                if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) {
+
+            // TODO - When bottom up cmds are allowed to change the status of parent job,
+            // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
+            // false
+
+            // top down
+            // if job is suspended
+            if (bundleStatus[0] == Job.Status.SUSPENDED
+                    || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
+                if (bundleActionStatus.containsKey(Job.Status.KILLED)
+                        || bundleActionStatus.containsKey(Job.Status.FAILED)
+                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
+                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
+                        || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+                }
+                else {
                     bundleStatus[0] = Job.Status.SUSPENDED;
-                    ret = true;
                 }
-                else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
-                        && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)
-                                + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) {
-                    // bundleStatus = Job.Status.SUSPENDEDWITHERROR;
-                    // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator
+                ret =true;
+            }
+
+            // bottom up
+            // Update status of parent from the status of its children
+            else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
+                    || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
+                int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
+                        .get(Job.Status.SUCCEEDED) : 0;
+                int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
+                        .get(Job.Status.KILLED) : 0;
+                int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
+                        .get(Job.Status.FAILED) : 0;
+                int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
+                        .get(Job.Status.DONEWITHERROR) : 0;
+
+                if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
                     bundleStatus[0] = Job.Status.SUSPENDED;
                     ret = true;
                 }
+                else if (bundleActions.size()  == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
+                        + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
+                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+                    ret = true;
+                }
             }
             return ret;
+
         }
 
+        private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
+                int coordActionsCount, Job.Status[] coordStatus){
+            boolean ret = false;
+            if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
+                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
+                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
+                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
+                    coordStatus[0] = Job.Status.PAUSEDWITHERROR;
+                }
+                else {
+                    coordStatus[0] = Job.Status.PAUSED;
+                }
+                ret = true;
+            }
+            return ret;
+        }
         private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
-                int coordActionsCount, Job.Status[] coordStatus) {
+                int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
             boolean ret = false;
-            if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
-                if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
+
+            // TODO - When bottom up cmds are allowed to change the status of parent job
+            //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
+            //,then the function should return
+            // false
+
+            // top down
+            // check for children only when parent is suspended
+            if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
+
+                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
+                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
+                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
+                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+                }
+                else {
+                    coordStatus[0] = Job.Status.SUSPENDED;
+                }
+                ret = true;
+            }
+            // bottom up
+            // look for children to check the parent's status only if materialization is
+            // done and all actions are non-pending
+            else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
+                int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
+                       .get(CoordinatorAction.Status.SUCCEEDED) : 0;
+                int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
+                        .get(CoordinatorAction.Status.KILLED) : 0;
+                int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
+                        .get(CoordinatorAction.Status.FAILED) : 0;
+                int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
+                        .get(CoordinatorAction.Status.TIMEDOUT) : 0;
+
+                if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
                     coordStatus[0] = Job.Status.SUSPENDED;
                     ret = true;
                 }
+                else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
+                        + succeededActions + killedActions + failedActions + timedoutActions) {
+                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+                    ret = true;
+                }
             }
             return ret;
         }
@@ -466,24 +591,16 @@ public class StatusTransitService implem
         private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
                 int coordActionsCount, Job.Status[] coordStatus) {
             boolean ret = false;
-            if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) {
-                // If all the bundle actions are succeeded then bundle job should be succeeded.
-                if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
-                    coordStatus[0] = Job.Status.RUNNING;
-                    ret = true;
+            if (coordStatus[0] != Job.Status.PREP) {
+                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
+                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
+                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
+                    coordStatus[0] = Job.Status.RUNNINGWITHERROR;
                 }
-                else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) {
-                    if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0)
-                            || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus
-                                    .get(CoordinatorAction.Status.KILLED) > 0)
-                            || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus
-                                    .get(CoordinatorAction.Status.TIMEDOUT) > 0)) {
-                        // coordStatus = Job.Status.RUNNINGWITHERROR;
-                        // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
-                        coordStatus[0] = Job.Status.RUNNING;
-                        ret = true;
-                    }
+                else {
+                    coordStatus[0] = Job.Status.RUNNING;
                 }
+                ret = true;
             }
             return ret;
         }
@@ -491,48 +608,30 @@ public class StatusTransitService implem
         private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
                 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
             boolean ret = false;
-            if (bundleActionStatus.containsKey(Job.Status.RUNNING)) {
-                // If all the bundle actions are succeeded then bundle job should be succeeded.
-                if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) {
-                    bundleStatus[0] = Job.Status.RUNNING;
-                    ret = true;
+            if (bundleStatus[0] != Job.Status.PREP) {
+                if (bundleActionStatus.containsKey(Job.Status.FAILED)
+                        || bundleActionStatus.containsKey(Job.Status.KILLED)
+                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
+                        || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
+                    bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
                 }
-                else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) {
-                    if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0)
-                            || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus
-                                    .get(Job.Status.KILLED) > 0)
-                            || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus
-                                    .get(Job.Status.DONEWITHERROR) > 0)
-                            || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus
-                                    .get(Job.Status.RUNNINGWITHERROR) > 0)) {
-                        // bundleStatus = Job.Status.RUNNINGWITHERROR;
-                        // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
-                        bundleStatus[0] = Job.Status.RUNNING;
-                        ret = true;
-                    }
+                else {
+                    bundleStatus[0] = Job.Status.RUNNING;
                 }
+                ret = true;
             }
             return ret;
+
         }
 
-        private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus,
-                List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus)
+        private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
                 throws JPAExecutorException {
             String jobId = bundleJob.getId();
-            boolean pendingBundleJob = bundleJob.isPending();
-            // Checking the bundle pending should be updated or not
-            int totalNonPendingActions = 0;
-            for (Job.Status js : bundleActionStatus.keySet()) {
-                totalNonPendingActions += bundleActionStatus.get(js);
-            }
-
-            if (totalNonPendingActions == bundleActions.size()) {
-                pendingBundleJob = false;
-            }
-
             // Update the Bundle Job
-            bundleJob.setStatus(bundleStatus);
-            if (pendingBundleJob) {
+            // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
+            // PAUSEDWITHERROR is not supported
+            bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
+            if (isPending) {
                 bundleJob.setPending();
                 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
             }
@@ -543,22 +642,24 @@ public class StatusTransitService implem
             jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
         }
 
-        private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
-                int coordActionsCount, CoordinatorJobBean coordJob, Job.Status coordStatus)
+        private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
                 throws JPAExecutorException, CommandException {
             Job.Status prevStatus = coordJob.getStatus();
             // Update the Coord Job
             if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
                     || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
-                if (coordStatus == Job.Status.SUSPENDED) {
+                if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
                     LOG.info("Coord Job [" + coordJob.getId()
-                            + "] status can not be updated as its already in Terminal state");
+                            + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
                     return;
                 }
             }
 
-            checkCoordPending(coordActionStatus, coordActionsCount, coordJob, false);
-            coordJob.setStatus(coordStatus);
+            checkCoordPending(isPending, coordJob, false);
+            // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
+            // not supported
+            coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
+            // Backward support when coordinator namespace is 0.1
             coordJob.setStatus(StatusUtils.getStatus(coordJob));
             coordJob.setLastModifiedTime(new Date());
             jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
@@ -571,20 +672,9 @@ public class StatusTransitService implem
             }
         }
 
-        private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
-                int coordActionsCount, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
-            boolean pendingCoordJob = coordJob.isPending();
+        private void checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
             // Checking the coordinator pending should be updated or not
-            int totalNonPendingActions = 0;
-            for (CoordinatorAction.Status js : coordActionStatus.keySet()) {
-                totalNonPendingActions += coordActionStatus.get(js);
-            }
-
-            if (totalNonPendingActions == coordActionsCount) {
-                pendingCoordJob = false;
-            }
-
-            if (pendingCoordJob) {
+            if (isPending) {
                 coordJob.setPending();
                 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
             }
@@ -638,7 +728,11 @@ public class StatusTransitService implem
                         }
                     }
                     // Running coord job might have pending false
-                    if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) {
+                    Job.Status coordJobStatus = coordJob.getStatus();
+                    if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
+                            || coordJobStatus.equals(Job.Status.RUNNING)
+                            || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
+                            || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
                         pendingJobCheckList.add(coordJob);
                     }
                 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java Fri Aug  3 07:38:00 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -46,10 +46,12 @@ public class StatusUtils {
                     if (coordJob.getStatus() == Job.Status.DONEWITHERROR) {
                         newStatus = Job.Status.SUCCEEDED;
                     }
-                    else if (coordJob.getStatus() == Job.Status.PAUSED) {
+                    else if (coordJob.getStatus() == Job.Status.PAUSED
+                            || coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
                         newStatus = Job.Status.RUNNING;
                     }
-                    else if (coordJob.getStatus() == Job.Status.RUNNING && coordJob.isDoneMaterialization()) {
+                    else if ((coordJob.getStatus() == Job.Status.RUNNING || coordJob.getStatus() == Job.Status.RUNNINGWITHERROR)
+                            && coordJob.isDoneMaterialization()) {
                         newStatus = Job.Status.SUCCEEDED;
                     }
                     else if (coordJob.getStatus() == Job.Status.PREPSUSPENDED) {
@@ -147,4 +149,31 @@ public class StatusUtils {
         }
         return ret;
     }
+
+    /**
+     * Get the status of coordinator job for Oozie versions (3.2 and before) when RUNNINGWITHERROR,
+     * SUSPENDEDWITHERROR and PAUSEDWITHERROR are not supported
+     * @param coordJob
+     * @return
+     */
+    public static Job.Status getStatusIfBackwardSupportTrue(Job.Status currentJobStatus) {
+        Job.Status newStatus = currentJobStatus;
+        Configuration conf = Services.get().getConf();
+        boolean backwardSupportForStatesWithoutError = conf.getBoolean(
+                StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, true);
+        if (backwardSupportForStatesWithoutError) {
+            if (currentJobStatus == Job.Status.PAUSEDWITHERROR) {
+                newStatus = Job.Status.PAUSED;
+            }
+            else if (currentJobStatus == Job.Status.SUSPENDEDWITHERROR) {
+                newStatus = Job.Status.SUSPENDED;
+            }
+            else if (currentJobStatus == Job.Status.RUNNINGWITHERROR) {
+                newStatus = Job.Status.RUNNING;
+            }
+        }
+
+        return newStatus;
+    }
+
 }

Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Fri Aug  3 07:38:00 2012
@@ -1265,6 +1265,18 @@
         </description>
     </property>
     
+    <property>
+        <name>oozie.service.StatusTransitService.backward.support.for.states.without.error</name>
+        <value>true</value>
+        <description>
+            true, if you want to keep Oozie 3.2 status transit.
+            Change it to false for Oozie 4.x releases.
+            if set true,
+            No states like RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR
+            for coordinator and bundle
+        </description>
+    </property>
+
     <!-- PauseTransitService -->
     <property>
         <name>oozie.service.PauseTransitService.PauseTransit.interval</name>

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java Fri Aug  3 07:38:00 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -77,6 +77,25 @@ public class TestBundleJobSuspendXComman
     }
 
     /**
+     * Test : Suspend bundle job in RUNNINGWITHERROR state
+     *
+     * @throws Exception
+     */
+    public void testBundleSuspendWithError() throws Exception {
+        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNINGWITHERROR, false);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(bundleJobGetCmd);
+        assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+
+        new BundleJobSuspendXCommand(job.getId()).call();
+
+        job = jpaService.execute(bundleJobGetCmd);
+        assertEquals(Job.Status.SUSPENDEDWITHERROR, job.getStatus());
+    }
+    /**
      * Test : Suspend bundle job
      *
      * @throws Exception

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java Fri Aug  3 07:38:00 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,6 +29,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.StatusTransitService;
 import org.apache.oozie.test.XDataTestCase;
 
 public class TestBundleRerunXCommand extends XDataTestCase {
@@ -73,6 +74,7 @@ public class TestBundleRerunXCommand ext
         assertEquals(Job.Status.RUNNING, job.getStatus());
     }
 
+
     /**
      * Test : Rerun bundle job for coordScope
      *
@@ -98,6 +100,35 @@ public class TestBundleRerunXCommand ext
     }
 
     /**
+     * Test : Rerun a DONEWITHERROR bundle job. Status should
+     * change to RUNNINGWITHERROR
+     *
+     * @throws Exception
+     */
+    public void testBundleRerunWithError() throws Exception {
+        Services.get().destroy();
+        setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+        new Services().init();
+        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.DONEWITHERROR, false);
+        this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUCCEEDED);
+        this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.FAILED);
+        addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false);
+        addRecordToCoordJobTable("action2", CoordinatorJob.Status.FAILED, false, false);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(Job.Status.DONEWITHERROR, job.getStatus());
+
+        new BundleRerunXCommand(job.getId(), null, "2009-02-01T00:00Z", false, true).call();
+
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+    }
+
+
+    /**
      * Test : Rerun PREP bundle job
      *
      * @throws Exception
@@ -148,6 +179,35 @@ public class TestBundleRerunXCommand ext
     }
 
     /**
+     * Test : Rerun PAUSEDINERROR bundle job. Status shouldn't change.
+     *
+     * @throws Exception
+     */
+    public void testBundleRerunInPausedWithError() throws Exception {
+        Date curr = new Date();
+        Date pauseTime = new Date(curr.getTime() - 1000);
+        BundleJobBean job = this.addRecordToBundleJobTableWithPausedTime(Job.Status.PAUSEDWITHERROR, false, pauseTime);
+        this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.FAILED);
+        this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.PAUSED);
+        addRecordToCoordJobTable("action1", CoordinatorJob.Status.FAILED, false, false);
+        addRecordToCoordJobTable("action2", CoordinatorJob.Status.PAUSED, false, false);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+
+        new BundleRerunXCommand(job.getId(), "action2", null, false, true).call();
+
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+        assertNotNull(job.getPauseTime());
+        assertFalse(job.isPending());
+    }
+
+
+    /**
      * Test : Rerun suspended bundle job
      *
      * @throws Exception
@@ -171,6 +231,34 @@ public class TestBundleRerunXCommand ext
         assertEquals(Job.Status.RUNNING, job.getStatus());
     }
 
+    /**
+     * Test : Rerun SUSPENDEDINERROR bundle job
+     *
+     * @throws Exception
+     */
+    public void testBundleRerunInSuspendedWithError() throws Exception {
+        Services.get().destroy();
+        setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+        new Services().init();
+        BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.SUSPENDEDWITHERROR, false);
+        this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUSPENDED);
+        this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUSPENDEDWITHERROR);
+        addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, false, false);
+        addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(Job.Status.SUSPENDEDWITHERROR, job.getStatus());
+
+        new BundleRerunXCommand(job.getId(), "action2", null, false, true).call();
+
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+    }
+
+
     protected BundleJobBean addRecordToBundleJobTableWithPausedTime(Job.Status jobStatus, boolean pending, Date pausedTime) throws Exception {
         BundleJobBean bundle = createBundleJob(jobStatus, pending);
         bundle.setPauseTime(pausedTime);

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java Fri Aug  3 07:38:00 2012
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.StatusTransitService;
 import org.apache.oozie.store.StoreException;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
@@ -259,6 +260,32 @@ public class TestCoordChangeXCommand ext
     }
 
     /**
+     * Change the pause time and end time of a failed coordinator job. Check whether the status changes
+     * to RUNNINGWITHERROR
+     * @throws Exception
+     */
+    public void testCoordChangeStatus() throws Exception {
+        Services.get().destroy();
+        setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+        new Services().init();
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + (20 * 60 * 1000));
+
+        final CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.FAILED, startTime, endTime,
+                true, true, 0);
+
+        String pauseTime = DateUtils.convertDateToString(startTime.getTime() + 10 * 60 * 1000);
+        String newEndTime = DateUtils.convertDateToString(startTime.getTime() + 40 * 60 * 1000);
+
+        new CoordChangeXCommand(job.getId(), "endtime=" + newEndTime + ";pausetime=" + pauseTime).call();
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId());
+        CoordinatorJobBean coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.RUNNINGWITHERROR, coordJob.getStatus());
+    }
+
+    /**
      * test pause time change : pending should mark false if job is running with
      * pending true. two actions should be removed for pause time changes.
      *

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java Fri Aug  3 07:38:00 2012
@@ -703,6 +703,31 @@ public class TestCoordRerunXCommand exte
         }
     }
 
+
+    /**
+     * Test : Rerun DONEWITHERROR coordinator job
+     *
+     * @throws Exception
+     */
+    public void testCoordRerunInDoneWithError() throws Exception {
+        Services.get().destroy();
+        setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+        new Services().init();
+        CoordinatorJobBean job = this.addRecordToCoordJobTable(Job.Status.DONEWITHERROR, false, false);
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(coordJobGetExecutor);
+        assertEquals(Job.Status.DONEWITHERROR, job.getStatus());
+
+        new CoordRerunXCommand(job.getId(), RestConstants.JOB_COORD_RERUN_DATE, "2009-12-15T01:00Z", false, true)
+                .call();
+        job = jpaService.execute(coordJobGetExecutor);
+        assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+
+    }
     /**
      * Test : Rerun paused coordinator job
      *
@@ -729,6 +754,31 @@ public class TestCoordRerunXCommand exte
     }
 
     /**
+     * Test : Rerun PAUSEDWITHERROR coordinator job
+     *
+     * @throws Exception
+     */
+    public void testCoordRerunInPausedWithError() throws Exception {
+        Date curr = new Date();
+        Date pauseTime = new Date(curr.getTime() - 1000);
+        CoordinatorJobBean job = this.addRecordToCoordJobTableWithPausedTime(Job.Status.PAUSEDWITHERROR, false, false, pauseTime);
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(coordJobGetExecutor);
+        assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+
+        new CoordRerunXCommand(job.getId(), RestConstants.JOB_COORD_RERUN_DATE, "2009-12-15T01:00Z", false, true)
+                .call();
+
+        job = jpaService.execute(coordJobGetExecutor);
+        assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+        assertNotNull(job.getPauseTime());
+    }
+
+    /**
      * Negative Test : rerun <jobId> -action 1 -nocleanup. Coordinator job is killed, so no actions are able to rerun.
      *
      * @throws Exception

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java Fri Aug  3 07:38:00 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -67,6 +67,30 @@ public class TestCoordResumeXCommand ext
         assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNING);
     }
 
+
+    /**
+     * Test : suspend a RUNNINGWITHERROR coordinator job and check the status to RUNNINGWITHERROR on resume
+     *
+     * @throws Exception
+     */
+    public void testCoordSuspendWithErrorAndResumeWithErrorForRunning() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNINGWITHERROR, false, false);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(coordJobGetCmd);
+        assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
+
+        new CoordSuspendXCommand(job.getId()).call();
+        job = jpaService.execute(coordJobGetCmd);
+        assertEquals(job.getStatus(), CoordinatorJob.Status.SUSPENDEDWITHERROR);
+
+        new CoordResumeXCommand(job.getId()).call();
+        job = jpaService.execute(coordJobGetCmd);
+        assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
+    }
+
     /**
      * Test : suspend a PREP coordinator job and resume to PREP
      *

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java Fri Aug  3 07:38:00 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,7 @@ import java.io.Writer;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.Job;
@@ -369,6 +370,7 @@ public class TestCoordSubmitXCommand ext
      * @throws Exception
      */
     public void testBasicSubmitWithBundleId() throws Exception {
+        BundleJobBean coordJob = addRecordToBundleJobTable(Job.Status.PREP, false);
         Configuration conf = new XConfiguration();
         String appPath = "file://" + getTestCaseDir() + File.separator + "coordinator.xml";
         String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"2009-02-01T01:00Z\" end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
@@ -390,15 +392,15 @@ public class TestCoordSubmitXCommand ext
         conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
         conf.set(OozieClient.USER_NAME, getTestUser());
 
-        this.addRecordToBundleActionTable("OOZIE-B", "COORD-NAME", 0, Job.Status.PREP);
+        this.addRecordToBundleActionTable(coordJob.getId(), "COORD-NAME", 0, Job.Status.PREP);
 
-        CoordSubmitXCommand sc = new CoordSubmitXCommand(conf, "UNIT_TESTING", "OOZIE-B", "COORD-NAME");
+        CoordSubmitXCommand sc = new CoordSubmitXCommand(conf, "UNIT_TESTING", coordJob.getId(), "COORD-NAME");
         String jobId = sc.call();
 
         assertEquals(jobId.substring(jobId.length() - 2), "-C");
         CoordinatorJobBean job = checkCoordJobs(jobId);
         if (job != null) {
-            assertEquals("OOZIE-B", job.getBundleId());
+            assertEquals(coordJob.getId(), job.getBundleId());
             assertEquals("COORD-NAME", job.getAppName());
             assertEquals("uri:oozie:coordinator:0.2", job.getAppNamespace());
         } else {

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java Fri Aug  3 07:38:00 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,7 +65,26 @@ public class TestCoordSuspendXCommand ex
         job = jpaService.execute(coordJobGetCmd);
         assertEquals(job.getStatus(), CoordinatorJob.Status.SUSPENDED);
     }
-    
+
+    /**
+     * Test : suspend a RUNNINGWITHERROR coordinator job
+     *
+     * @throws Exception
+     */
+    public void testCoordSuspendWithErrorPostive() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNINGWITHERROR, false, false);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(coordJobGetCmd);
+        assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
+
+        new CoordSuspendXCommand(job.getId()).call();
+        job = jpaService.execute(coordJobGetCmd);
+        assertEquals(job.getStatus(), CoordinatorJob.Status.SUSPENDEDWITHERROR);
+    }
+
     /**
      * Negative Test : suspend a SUCCEEDED coordinator job
      *

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java Fri Aug  3 07:38:00 2012
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa;
-
-import java.util.List;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-
-public class TestCoordJobGetActionsStatusByPendingFalseJPAExecutor extends XDataTestCase {
-    Services services;
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        services.init();
-        cleanUpDBTables();
-        LocalOozie.start();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        LocalOozie.stop();
-        services.destroy();
-        super.tearDown();
-    }
-
-    /*
-     * Add a Coordinator action with pending false and check for expected column values
-     */
-    public void testCoordActionsStatusByPendingFalseForColumnValues() throws Exception {
-        int actionNum = 1;
-        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        String jobId = job.getId();
-        CoordinatorActionBean action = addRecordToCoordActionTable(jobId, actionNum++,
-                CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
-
-        _testCoordActionForCorrectColumnValues(jobId, action.getStatus());
-
-    }
-
-    /*
-     * Add 3 Coordinator actions with pending false and 1 Coordinator action with pending true.
-     * Then check for expected number of actions retrieved
-     */
-    public void testCoordActionsStatusByPendingFalseForSize() throws Exception{
-        int actionNum = 1;
-        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        String jobId = job.getId();
-        addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
-        addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
-        addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-        addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 1);
-
-        _testCoordActionsPendingSize(jobId, 3);
-    }
-
-    // test sql projection operation
-    private void _testCoordActionForCorrectColumnValues(String jobId, CoordinatorAction.Status status) throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        // Call JPAExecutor to get actions which are pending
-        CoordJobGetActionsStatusByPendingFalseJPAExecutor actionGetCmd = new CoordJobGetActionsStatusByPendingFalseJPAExecutor(
-                jobId);
-        List<CoordinatorAction.Status> actionList = jpaService.execute(actionGetCmd);
-        CoordinatorAction.Status cAStatus = actionList.get(0);
-
-        assertEquals(cAStatus, status);
-
-    }
-
-    // test sql selection operation
-    private void _testCoordActionsPendingSize(String jobId, int expectedSize) throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        // Call JPAExecutor to get actions which are pending
-        CoordJobGetActionsStatusByPendingFalseJPAExecutor actionGetCmd = new CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId);
-        List<CoordinatorAction.Status> actionList = jpaService.execute(actionGetCmd);
-        // As 3 actions are not pending, expected result set is of size 3
-        assertEquals(actionList.size(), expectedSize);
-
-    }
-
-}