You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/03 09:38:01 UTC
svn commit: r1368811 [2/3] - in /incubator/oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/ core/src...
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java Fri Aug 3 07:38:00 2012
@@ -18,11 +18,14 @@
package org.apache.oozie.service;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.TreeSet;
+import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BundleActionBean;
@@ -37,12 +40,12 @@ import org.apache.oozie.command.bundle.B
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusByPendingFalseJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
@@ -64,6 +67,7 @@ public class StatusTransitService implem
public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
+ public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
private static int limit = -1;
private static Date lastInstanceStartTime = null;
private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
@@ -120,6 +124,21 @@ public class StatusTransitService implem
}
}
+ public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
+ Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
+ @Override
+ public int compare(BundleJobBean b1, BundleJobBean b2) {
+ if (b1.getId().equals(b2.getId())) {
+ return 0;
+ }
+ else
+ return 1;
+ }
+ });
+ s.addAll(pendingJobList);
+ return new ArrayList<BundleJobBean>(s);
+ }
+
/**
* Aggregate bundle actions' status to bundle jobs
*
@@ -128,15 +147,11 @@ public class StatusTransitService implem
*/
private void bundleTransit() throws JPAExecutorException, CommandException {
List<BundleJobBean> pendingJobCheckList = null;
- List<BundleJobBean> runningJobCheckList = null;
- List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>();
+
if (lastInstanceStartTime == null) {
LOG.info("Running bundle status service first instance");
- // this is the first instance, we need to check for all pending jobs;
- pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit));
- runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit));
- bundleLists.add(pendingJobCheckList);
- bundleLists.add(runningJobCheckList);
+ // this is the first instance, we need to check for all pending or running jobs;
+ pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
}
else {
LOG.info("Running bundle status service from last instance time = "
@@ -153,31 +168,30 @@ public class StatusTransitService implem
for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
// Running bundle job might have pending false
- if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) {
+ if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
+ || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
+ || bundle.getStatus().equals(Job.Status.PAUSED)
+ || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
pendingJobCheckList.add(bundle);
}
}
- runningJobCheckList = pendingJobCheckList;
- bundleLists.add(pendingJobCheckList);
}
- aggregateBundleJobsStatus(bundleLists);
+ aggregateBundleJobsStatus(pendingJobCheckList);
}
- private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException,
+ private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
CommandException {
if (bundleLists != null) {
- for (List<BundleJobBean> listBundleBean : bundleLists) {
- for (BundleJobBean bundleJob : listBundleBean) {
+ for (BundleJobBean bundleJob : bundleLists) {
try {
String jobId = bundleJob.getId();
Job.Status[] bundleStatus = new Job.Status[1];
bundleStatus[0] = bundleJob.getStatus();
- List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(
+ List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
jobId));
HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
boolean foundPending = false;
for (BundleActionBean bAction : bundleActions) {
- if (!bAction.isPending()) {
int counter = 0;
if (bundleActionStatus.containsKey(bAction.getStatus())) {
counter = bundleActionStatus.get(bAction.getStatus()) + 1;
@@ -192,41 +206,36 @@ public class StatusTransitService implem
LOG.info("Bundle job ["+ jobId
+ "] has been killed since one of its coordinator job failed submission.");
}
- }
- else {
+
+ if (bAction.isPending()) {
foundPending = true;
- break;
}
}
- if (foundPending) {
- continue;
- }
-
- if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
+ if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
+ "' from '" + bundleJob.getStatus() + "'");
- updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+ updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
}
- else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
+ else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
+ "' from '" + bundleJob.getStatus() + "'");
- updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+ updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
}
else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
+ "' from '" + bundleJob.getStatus() + "'");
- updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+ updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
}
- else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) {
+ else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
+ "' from '" + bundleJob.getStatus() + "'");
- updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+ updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
}
else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
+ "' from '" + bundleJob.getStatus() + "'");
- updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
+ updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
}
}
catch (Exception ex) {
@@ -235,7 +244,7 @@ public class StatusTransitService implem
}
}
}
- }
+
}
private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
@@ -255,14 +264,14 @@ public class StatusTransitService implem
Job.Status[] coordStatus = new Job.Status[1];
coordStatus[0] = coordJob.getStatus();
//Get count of Coordinator actions with pending true
+ boolean isPending = false;
int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
if (count > 0) {
- continue;
+ isPending = true;
}
- // Code below this is executed only when none of the Coordinator actions are pending
- // Get status of Coordinator actions with pending false
+ // Get status of Coordinator actions
List<CoordinatorAction.Status> coordActionStatusList = jpaService
- .execute(new CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId));
+ .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
for (CoordinatorAction.Status status : coordActionStatusList) {
@@ -277,26 +286,30 @@ public class StatusTransitService implem
}
int nonPendingCoordActionsCount = coordActionStatusList.size();
+
if ((coordJob.isDoneMaterialization() || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
&& checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
+ "' from '" + coordJob.getStatus() + "'");
- updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
+ updateCoordJob(isPending, coordJob, coordStatus[0]);
}
- else if (coordJob.isDoneMaterialization()
- && checkCoordSuspendStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
+ else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
+ "' from '" + coordJob.getStatus() + "'");
- updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
+ updateCoordJob(isPending, coordJob, coordStatus[0]);
+ }
+ else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
+ LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
+ + "' from '" + coordJob.getStatus() + "'");
+ updateCoordJob(isPending, coordJob, coordStatus[0]);
}
else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
+ "' from '" + coordJob.getStatus() + "'");
- updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
+ updateCoordJob(isPending, coordJob, coordStatus[0]);
}
- // checking pending flag for job when user killed or suspended the job
else {
- checkCoordPending(coordActionStatus, nonPendingCoordActionsCount, coordJob, true);
+ checkCoordPending(isPending, coordJob, true);
}
}
catch (Exception ex) {
@@ -412,53 +425,165 @@ public class StatusTransitService implem
}
private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
- List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
+ List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
boolean ret = false;
- if (bundleActionStatus.containsKey(Job.Status.PAUSED)) {
- if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) {
- bundleStatus[0] = Job.Status.PAUSED;
- ret = true;
+
+ // TODO - When bottom up cmds are allowed to change the status of parent job,
+ // if none of the bundle actions are in paused or pausedwitherror, the function should return
+ // false
+
+ // top down
+ // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
+ // state, then job should be PAUSED otherwise it should be pausedwitherror
+ if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
+ if (bundleActionStatus.containsKey(Job.Status.KILLED)
+ || bundleActionStatus.containsKey(Job.Status.FAILED)
+ || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
+ || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
+ || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
+ || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+ bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
+ }
+ else {
+ bundleJobStatus[0] = Job.Status.PAUSED;
}
- else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
- && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)
- + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) {
- // bundleStatus = Job.Status.PAUSEDWITHERROR;
- // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator
- bundleStatus[0] = Job.Status.PAUSED;
+ ret = true;
+ }
+
+ // bottom up; check the status of parent through their children
+ else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
+ && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
+ bundleJobStatus[0] = Job.Status.PAUSED;
+ ret = true;
+ }
+ else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+ int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
+ .get(Job.Status.PAUSED) : 0;
+ if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
+ bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
ret = true;
}
}
+ else {
+ ret = false;
+ }
return ret;
}
+
private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
- List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
+ List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
boolean ret = false;
- if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) {
- if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) {
+
+ // TODO - When bottom up cmds are allowed to change the status of parent job,
+ // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
+ // false
+
+ // top down
+ // if job is suspended
+ if (bundleStatus[0] == Job.Status.SUSPENDED
+ || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
+ if (bundleActionStatus.containsKey(Job.Status.KILLED)
+ || bundleActionStatus.containsKey(Job.Status.FAILED)
+ || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
+ || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
+ || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
+ bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+ }
+ else {
bundleStatus[0] = Job.Status.SUSPENDED;
- ret = true;
}
- else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
- && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)
- + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) {
- // bundleStatus = Job.Status.SUSPENDEDWITHERROR;
- // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator
+ ret =true;
+ }
+
+ // bottom up
+ // Update status of parent from the status of its children
+ else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
+ || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
+ int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
+ .get(Job.Status.SUCCEEDED) : 0;
+ int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
+ .get(Job.Status.KILLED) : 0;
+ int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
+ .get(Job.Status.FAILED) : 0;
+ int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
+ .get(Job.Status.DONEWITHERROR) : 0;
+
+ if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
bundleStatus[0] = Job.Status.SUSPENDED;
ret = true;
}
+ else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
+ + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
+ bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+ ret = true;
+ }
}
return ret;
+
}
+ private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
+ int coordActionsCount, Job.Status[] coordStatus){
+ boolean ret = false;
+ if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
+ if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
+ || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
+ || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
+ coordStatus[0] = Job.Status.PAUSEDWITHERROR;
+ }
+ else {
+ coordStatus[0] = Job.Status.PAUSED;
+ }
+ ret = true;
+ }
+ return ret;
+ }
private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
- int coordActionsCount, Job.Status[] coordStatus) {
+ int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
boolean ret = false;
- if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
- if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
+
+ // TODO - When bottom up cmds are allowed to change the status of parent job
+ //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
+ //,then the function should return
+ // false
+
+ // top down
+ // check for children only when parent is suspended
+ if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
+
+ if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
+ || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
+ || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
+ coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+ }
+ else {
+ coordStatus[0] = Job.Status.SUSPENDED;
+ }
+ ret = true;
+ }
+ // bottom up
+ // look for children to check the parent's status only if materialization is
+ // done and all actions are non-pending
+ else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
+ int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
+ .get(CoordinatorAction.Status.SUCCEEDED) : 0;
+ int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
+ .get(CoordinatorAction.Status.KILLED) : 0;
+ int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
+ .get(CoordinatorAction.Status.FAILED) : 0;
+ int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
+ .get(CoordinatorAction.Status.TIMEDOUT) : 0;
+
+ if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
coordStatus[0] = Job.Status.SUSPENDED;
ret = true;
}
+ else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
+ + succeededActions + killedActions + failedActions + timedoutActions) {
+ coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
+ ret = true;
+ }
}
return ret;
}
@@ -466,24 +591,16 @@ public class StatusTransitService implem
private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
int coordActionsCount, Job.Status[] coordStatus) {
boolean ret = false;
- if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) {
- // If all the bundle actions are succeeded then bundle job should be succeeded.
- if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
- coordStatus[0] = Job.Status.RUNNING;
- ret = true;
+ if (coordStatus[0] != Job.Status.PREP) {
+ if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
+ || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
+ || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
+ coordStatus[0] = Job.Status.RUNNINGWITHERROR;
}
- else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) {
- if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0)
- || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus
- .get(CoordinatorAction.Status.KILLED) > 0)
- || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus
- .get(CoordinatorAction.Status.TIMEDOUT) > 0)) {
- // coordStatus = Job.Status.RUNNINGWITHERROR;
- // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
- coordStatus[0] = Job.Status.RUNNING;
- ret = true;
- }
+ else {
+ coordStatus[0] = Job.Status.RUNNING;
}
+ ret = true;
}
return ret;
}
@@ -491,48 +608,30 @@ public class StatusTransitService implem
private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
boolean ret = false;
- if (bundleActionStatus.containsKey(Job.Status.RUNNING)) {
- // If all the bundle actions are succeeded then bundle job should be succeeded.
- if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) {
- bundleStatus[0] = Job.Status.RUNNING;
- ret = true;
+ if (bundleStatus[0] != Job.Status.PREP) {
+ if (bundleActionStatus.containsKey(Job.Status.FAILED)
+ || bundleActionStatus.containsKey(Job.Status.KILLED)
+ || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
+ || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
+ bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
}
- else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) {
- if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0)
- || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus
- .get(Job.Status.KILLED) > 0)
- || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus
- .get(Job.Status.DONEWITHERROR) > 0)
- || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus
- .get(Job.Status.RUNNINGWITHERROR) > 0)) {
- // bundleStatus = Job.Status.RUNNINGWITHERROR;
- // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
- bundleStatus[0] = Job.Status.RUNNING;
- ret = true;
- }
+ else {
+ bundleStatus[0] = Job.Status.RUNNING;
}
+ ret = true;
}
return ret;
+
}
- private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus,
- List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus)
+ private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
throws JPAExecutorException {
String jobId = bundleJob.getId();
- boolean pendingBundleJob = bundleJob.isPending();
- // Checking the bundle pending should be updated or not
- int totalNonPendingActions = 0;
- for (Job.Status js : bundleActionStatus.keySet()) {
- totalNonPendingActions += bundleActionStatus.get(js);
- }
-
- if (totalNonPendingActions == bundleActions.size()) {
- pendingBundleJob = false;
- }
-
// Update the Bundle Job
- bundleJob.setStatus(bundleStatus);
- if (pendingBundleJob) {
+ // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
+ // PAUSEDWITHERROR is not supported
+ bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
+ if (isPending) {
bundleJob.setPending();
LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
}
@@ -543,22 +642,24 @@ public class StatusTransitService implem
jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
}
- private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
- int coordActionsCount, CoordinatorJobBean coordJob, Job.Status coordStatus)
+ private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
throws JPAExecutorException, CommandException {
Job.Status prevStatus = coordJob.getStatus();
// Update the Coord Job
if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
|| coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
- if (coordStatus == Job.Status.SUSPENDED) {
+ if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
LOG.info("Coord Job [" + coordJob.getId()
- + "] status can not be updated as its already in Terminal state");
+ + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
return;
}
}
- checkCoordPending(coordActionStatus, coordActionsCount, coordJob, false);
- coordJob.setStatus(coordStatus);
+ checkCoordPending(isPending, coordJob, false);
+ // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
+ // not supported
+ coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
+ // Backward support when coordinator namespace is 0.1
coordJob.setStatus(StatusUtils.getStatus(coordJob));
coordJob.setLastModifiedTime(new Date());
jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
@@ -571,20 +672,9 @@ public class StatusTransitService implem
}
}
- private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
- int coordActionsCount, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
- boolean pendingCoordJob = coordJob.isPending();
+ private void checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
// Checking the coordinator pending should be updated or not
- int totalNonPendingActions = 0;
- for (CoordinatorAction.Status js : coordActionStatus.keySet()) {
- totalNonPendingActions += coordActionStatus.get(js);
- }
-
- if (totalNonPendingActions == coordActionsCount) {
- pendingCoordJob = false;
- }
-
- if (pendingCoordJob) {
+ if (isPending) {
coordJob.setPending();
LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
}
@@ -638,7 +728,11 @@ public class StatusTransitService implem
}
}
// Running coord job might have pending false
- if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) {
+ Job.Status coordJobStatus = coordJob.getStatus();
+ if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
+ || coordJobStatus.equals(Job.Status.RUNNING)
+ || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
+ || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
pendingJobCheckList.add(coordJob);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/StatusUtils.java Fri Aug 3 07:38:00 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -46,10 +46,12 @@ public class StatusUtils {
if (coordJob.getStatus() == Job.Status.DONEWITHERROR) {
newStatus = Job.Status.SUCCEEDED;
}
- else if (coordJob.getStatus() == Job.Status.PAUSED) {
+ else if (coordJob.getStatus() == Job.Status.PAUSED
+ || coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
newStatus = Job.Status.RUNNING;
}
- else if (coordJob.getStatus() == Job.Status.RUNNING && coordJob.isDoneMaterialization()) {
+ else if ((coordJob.getStatus() == Job.Status.RUNNING || coordJob.getStatus() == Job.Status.RUNNINGWITHERROR)
+ && coordJob.isDoneMaterialization()) {
newStatus = Job.Status.SUCCEEDED;
}
else if (coordJob.getStatus() == Job.Status.PREPSUSPENDED) {
@@ -147,4 +149,31 @@ public class StatusUtils {
}
return ret;
}
+
+ /**
+ * Get the status of coordinator job for Oozie versions (3.2 and before) when RUNNINGWITHERROR,
+ * SUSPENDEDWITHERROR and PAUSEDWITHERROR are not supported
+ * @param coordJob
+ * @return
+ */
+ public static Job.Status getStatusIfBackwardSupportTrue(Job.Status currentJobStatus) {
+ Job.Status newStatus = currentJobStatus;
+ Configuration conf = Services.get().getConf();
+ boolean backwardSupportForStatesWithoutError = conf.getBoolean(
+ StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, true);
+ if (backwardSupportForStatesWithoutError) {
+ if (currentJobStatus == Job.Status.PAUSEDWITHERROR) {
+ newStatus = Job.Status.PAUSED;
+ }
+ else if (currentJobStatus == Job.Status.SUSPENDEDWITHERROR) {
+ newStatus = Job.Status.SUSPENDED;
+ }
+ else if (currentJobStatus == Job.Status.RUNNINGWITHERROR) {
+ newStatus = Job.Status.RUNNING;
+ }
+ }
+
+ return newStatus;
+ }
+
}
Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Fri Aug 3 07:38:00 2012
@@ -1265,6 +1265,18 @@
</description>
</property>
+ <property>
+ <name>oozie.service.StatusTransitService.backward.support.for.states.without.error</name>
+ <value>true</value>
+ <description>
+ true, if you want to keep Oozie 3.2 status transit.
+ Change it to false for Oozie 4.x releases.
+ if set true,
+ No states like RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR
+ for coordinator and bundle
+ </description>
+ </property>
+
<!-- PauseTransitService -->
<property>
<name>oozie.service.PauseTransitService.PauseTransit.interval</name>
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleJobSuspendXCommand.java Fri Aug 3 07:38:00 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -77,6 +77,25 @@ public class TestBundleJobSuspendXComman
}
/**
+ * Test : Suspend bundle job in RUNNINGWITHERROR state
+ *
+ * @throws Exception
+ */
+ public void testBundleSuspendWithError() throws Exception {
+ BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNINGWITHERROR, false);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(bundleJobGetCmd);
+ assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+
+ new BundleJobSuspendXCommand(job.getId()).call();
+
+ job = jpaService.execute(bundleJobGetCmd);
+ assertEquals(Job.Status.SUSPENDEDWITHERROR, job.getStatus());
+ }
+ /**
* Test : Suspend bundle job
*
* @throws Exception
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleRerunXCommand.java Fri Aug 3 07:38:00 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,6 +29,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.StatusTransitService;
import org.apache.oozie.test.XDataTestCase;
public class TestBundleRerunXCommand extends XDataTestCase {
@@ -73,6 +74,7 @@ public class TestBundleRerunXCommand ext
assertEquals(Job.Status.RUNNING, job.getStatus());
}
+
/**
* Test : Rerun bundle job for coordScope
*
@@ -98,6 +100,35 @@ public class TestBundleRerunXCommand ext
}
/**
+ * Test : Rerun a DONEWITHERROR bundle job. Status should
+ * change to RUNNINGWITHERROR
+ *
+ * @throws Exception
+ */
+ public void testBundleRerunWithError() throws Exception {
+ Services.get().destroy();
+ setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+ new Services().init();
+ BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.DONEWITHERROR, false);
+ this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUCCEEDED);
+ this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.FAILED);
+ addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUCCEEDED, false, false);
+ addRecordToCoordJobTable("action2", CoordinatorJob.Status.FAILED, false, false);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(bundleJobGetExecutor);
+ assertEquals(Job.Status.DONEWITHERROR, job.getStatus());
+
+ new BundleRerunXCommand(job.getId(), null, "2009-02-01T00:00Z", false, true).call();
+
+ job = jpaService.execute(bundleJobGetExecutor);
+ assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+ }
+
+
+ /**
* Test : Rerun PREP bundle job
*
* @throws Exception
@@ -148,6 +179,35 @@ public class TestBundleRerunXCommand ext
}
/**
+ * Test : Rerun PAUSEDINERROR bundle job. Status shouldn't change.
+ *
+ * @throws Exception
+ */
+ public void testBundleRerunInPausedWithError() throws Exception {
+ Date curr = new Date();
+ Date pauseTime = new Date(curr.getTime() - 1000);
+ BundleJobBean job = this.addRecordToBundleJobTableWithPausedTime(Job.Status.PAUSEDWITHERROR, false, pauseTime);
+ this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.FAILED);
+ this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.PAUSED);
+ addRecordToCoordJobTable("action1", CoordinatorJob.Status.FAILED, false, false);
+ addRecordToCoordJobTable("action2", CoordinatorJob.Status.PAUSED, false, false);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(bundleJobGetExecutor);
+ assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+
+ new BundleRerunXCommand(job.getId(), "action2", null, false, true).call();
+
+ job = jpaService.execute(bundleJobGetExecutor);
+ assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+ assertNotNull(job.getPauseTime());
+ assertFalse(job.isPending());
+ }
+
+
+ /**
* Test : Rerun suspended bundle job
*
* @throws Exception
@@ -171,6 +231,34 @@ public class TestBundleRerunXCommand ext
assertEquals(Job.Status.RUNNING, job.getStatus());
}
+ /**
+ * Test : Rerun SUSPENDEDINERROR bundle job
+ *
+ * @throws Exception
+ */
+ public void testBundleRerunInSuspendedWithError() throws Exception {
+ Services.get().destroy();
+ setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+ new Services().init();
+ BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.SUSPENDEDWITHERROR, false);
+ this.addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.SUSPENDED);
+ this.addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUSPENDEDWITHERROR);
+ addRecordToCoordJobTable("action1", CoordinatorJob.Status.SUSPENDED, false, false);
+ addRecordToCoordJobTable("action2", CoordinatorJob.Status.SUSPENDEDWITHERROR, false, false);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(bundleJobGetExecutor);
+ assertEquals(Job.Status.SUSPENDEDWITHERROR, job.getStatus());
+
+ new BundleRerunXCommand(job.getId(), "action2", null, false, true).call();
+
+ job = jpaService.execute(bundleJobGetExecutor);
+ assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+ }
+
+
protected BundleJobBean addRecordToBundleJobTableWithPausedTime(Job.Status jobStatus, boolean pending, Date pausedTime) throws Exception {
BundleJobBean bundle = createBundleJob(jobStatus, pending);
bundle.setPauseTime(pausedTime);
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java Fri Aug 3 07:38:00 2012
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.StatusTransitService;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
@@ -259,6 +260,32 @@ public class TestCoordChangeXCommand ext
}
/**
+ * Change the pause time and end time of a failed coordinator job. Check whether the status changes
+ * to RUNNINGWITHERROR
+ * @throws Exception
+ */
+ public void testCoordChangeStatus() throws Exception {
+ Services.get().destroy();
+ setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+ new Services().init();
+ Date startTime = new Date();
+ Date endTime = new Date(startTime.getTime() + (20 * 60 * 1000));
+
+ final CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.FAILED, startTime, endTime,
+ true, true, 0);
+
+ String pauseTime = DateUtils.convertDateToString(startTime.getTime() + 10 * 60 * 1000);
+ String newEndTime = DateUtils.convertDateToString(startTime.getTime() + 40 * 60 * 1000);
+
+ new CoordChangeXCommand(job.getId(), "endtime=" + newEndTime + ";pausetime=" + pauseTime).call();
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId());
+ CoordinatorJobBean coordJob = jpaService.execute(coordGetCmd);
+ assertEquals(Job.Status.RUNNINGWITHERROR, coordJob.getStatus());
+ }
+
+ /**
* test pause time change : pending should mark false if job is running with
* pending true. two actions should be removed for pause time changes.
*
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java Fri Aug 3 07:38:00 2012
@@ -703,6 +703,31 @@ public class TestCoordRerunXCommand exte
}
}
+
+ /**
+ * Test : Rerun DONEWITHERROR coordinator job
+ *
+ * @throws Exception
+ */
+ public void testCoordRerunInDoneWithError() throws Exception {
+ Services.get().destroy();
+ setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+ new Services().init();
+ CoordinatorJobBean job = this.addRecordToCoordJobTable(Job.Status.DONEWITHERROR, false, false);
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(coordJobGetExecutor);
+ assertEquals(Job.Status.DONEWITHERROR, job.getStatus());
+
+ new CoordRerunXCommand(job.getId(), RestConstants.JOB_COORD_RERUN_DATE, "2009-12-15T01:00Z", false, true)
+ .call();
+ job = jpaService.execute(coordJobGetExecutor);
+ assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
+
+ }
/**
* Test : Rerun paused coordinator job
*
@@ -729,6 +754,31 @@ public class TestCoordRerunXCommand exte
}
/**
+ * Test : Rerun PAUSEDWITHERROR coordinator job
+ *
+ * @throws Exception
+ */
+ public void testCoordRerunInPausedWithError() throws Exception {
+ Date curr = new Date();
+ Date pauseTime = new Date(curr.getTime() - 1000);
+ CoordinatorJobBean job = this.addRecordToCoordJobTableWithPausedTime(Job.Status.PAUSEDWITHERROR, false, false, pauseTime);
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(coordJobGetExecutor);
+ assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+
+ new CoordRerunXCommand(job.getId(), RestConstants.JOB_COORD_RERUN_DATE, "2009-12-15T01:00Z", false, true)
+ .call();
+
+ job = jpaService.execute(coordJobGetExecutor);
+ assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
+ assertNotNull(job.getPauseTime());
+ }
+
+ /**
* Negative Test : rerun <jobId> -action 1 -nocleanup. Coordinator job is killed, so no actions are able to rerun.
*
* @throws Exception
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java Fri Aug 3 07:38:00 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -67,6 +67,30 @@ public class TestCoordResumeXCommand ext
assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNING);
}
+
+ /**
+ * Test : suspend a RUNNINGWITHERROR coordinator job and check the status to RUNNINGWITHERROR on resume
+ *
+ * @throws Exception
+ */
+ public void testCoordSuspendWithErrorAndResumeWithErrorForRunning() throws Exception {
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNINGWITHERROR, false, false);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(coordJobGetCmd);
+ assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
+
+ new CoordSuspendXCommand(job.getId()).call();
+ job = jpaService.execute(coordJobGetCmd);
+ assertEquals(job.getStatus(), CoordinatorJob.Status.SUSPENDEDWITHERROR);
+
+ new CoordResumeXCommand(job.getId()).call();
+ job = jpaService.execute(coordJobGetCmd);
+ assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
+ }
+
/**
* Test : suspend a PREP coordinator job and resume to PREP
*
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java Fri Aug 3 07:38:00 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,7 @@ import java.io.Writer;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.Job;
@@ -369,6 +370,7 @@ public class TestCoordSubmitXCommand ext
* @throws Exception
*/
public void testBasicSubmitWithBundleId() throws Exception {
+ BundleJobBean coordJob = addRecordToBundleJobTable(Job.Status.PREP, false);
Configuration conf = new XConfiguration();
String appPath = "file://" + getTestCaseDir() + File.separator + "coordinator.xml";
String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"2009-02-01T01:00Z\" end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
@@ -390,15 +392,15 @@ public class TestCoordSubmitXCommand ext
conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
conf.set(OozieClient.USER_NAME, getTestUser());
- this.addRecordToBundleActionTable("OOZIE-B", "COORD-NAME", 0, Job.Status.PREP);
+ this.addRecordToBundleActionTable(coordJob.getId(), "COORD-NAME", 0, Job.Status.PREP);
- CoordSubmitXCommand sc = new CoordSubmitXCommand(conf, "UNIT_TESTING", "OOZIE-B", "COORD-NAME");
+ CoordSubmitXCommand sc = new CoordSubmitXCommand(conf, "UNIT_TESTING", coordJob.getId(), "COORD-NAME");
String jobId = sc.call();
assertEquals(jobId.substring(jobId.length() - 2), "-C");
CoordinatorJobBean job = checkCoordJobs(jobId);
if (job != null) {
- assertEquals("OOZIE-B", job.getBundleId());
+ assertEquals(coordJob.getId(), job.getBundleId());
assertEquals("COORD-NAME", job.getAppName());
assertEquals("uri:oozie:coordinator:0.2", job.getAppNamespace());
} else {
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordSuspendXCommand.java Fri Aug 3 07:38:00 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,7 +65,26 @@ public class TestCoordSuspendXCommand ex
job = jpaService.execute(coordJobGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.SUSPENDED);
}
-
+
+ /**
+ * Test : suspend a RUNNINGWITHERROR coordinator job
+ *
+ * @throws Exception
+ */
+ public void testCoordSuspendWithErrorPostive() throws Exception {
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNINGWITHERROR, false, false);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(coordJobGetCmd);
+ assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
+
+ new CoordSuspendXCommand(job.getId()).call();
+ job = jpaService.execute(coordJobGetCmd);
+ assertEquals(job.getStatus(), CoordinatorJob.Status.SUSPENDEDWITHERROR);
+ }
+
/**
* Negative Test : suspend a SUCCEEDED coordinator job
*
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java?rev=1368811&r1=1368810&r2=1368811&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsStatusByPendingFalseJPAExecutor.java Fri Aug 3 07:38:00 2012
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa;
-
-import java.util.List;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-
-public class TestCoordJobGetActionsStatusByPendingFalseJPAExecutor extends XDataTestCase {
- Services services;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- services = new Services();
- services.init();
- cleanUpDBTables();
- LocalOozie.start();
- }
-
- @Override
- protected void tearDown() throws Exception {
- LocalOozie.stop();
- services.destroy();
- super.tearDown();
- }
-
- /*
- * Add a Coordinator action with pending false and check for expected column values
- */
- public void testCoordActionsStatusByPendingFalseForColumnValues() throws Exception {
- int actionNum = 1;
- CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
- String jobId = job.getId();
- CoordinatorActionBean action = addRecordToCoordActionTable(jobId, actionNum++,
- CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
-
- _testCoordActionForCorrectColumnValues(jobId, action.getStatus());
-
- }
-
- /*
- * Add 3 Coordinator actions with pending false and 1 Coordinator action with pending true.
- * Then check for expected number of actions retrieved
- */
- public void testCoordActionsStatusByPendingFalseForSize() throws Exception{
- int actionNum = 1;
- CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
- String jobId = job.getId();
- addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
- addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
- addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- addRecordToCoordActionTable(jobId, actionNum++, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 1);
-
- _testCoordActionsPendingSize(jobId, 3);
- }
-
- // test sql projection operation
- private void _testCoordActionForCorrectColumnValues(String jobId, CoordinatorAction.Status status) throws Exception {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- // Call JPAExecutor to get actions which are pending
- CoordJobGetActionsStatusByPendingFalseJPAExecutor actionGetCmd = new CoordJobGetActionsStatusByPendingFalseJPAExecutor(
- jobId);
- List<CoordinatorAction.Status> actionList = jpaService.execute(actionGetCmd);
- CoordinatorAction.Status cAStatus = actionList.get(0);
-
- assertEquals(cAStatus, status);
-
- }
-
- // test sql selection operation
- private void _testCoordActionsPendingSize(String jobId, int expectedSize) throws Exception {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- // Call JPAExecutor to get actions which are pending
- CoordJobGetActionsStatusByPendingFalseJPAExecutor actionGetCmd = new CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId);
- List<CoordinatorAction.Status> actionList = jpaService.execute(actionGetCmd);
- // As 3 actions are not pending, expected result set is of size 3
- assertEquals(actionList.size(), expectedSize);
-
- }
-
-}