You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/22 21:32:05 UTC
svn commit: r1376204 [2/4] - in /incubator/oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/ core/src...
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Wed Aug 22 19:32:02 2012
@@ -19,18 +19,15 @@ package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
-import java.util.ArrayList;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorActionInfo;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.XException;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.hadoop.FsActionExecutor;
@@ -45,15 +42,11 @@ import org.apache.oozie.command.RerunTra
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.coord.CoordUtils;
-import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.InstrumentUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
@@ -224,7 +217,7 @@ public class CoordRerunXCommand extends
coordAction.setExternalStatus("");
coordAction.setRerunTime(new Date());
coordAction.setLastModifiedTime(new Date());
- jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
+ updateList.add(coordAction);
writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
}
@@ -241,8 +234,11 @@ public class CoordRerunXCommand extends
throws Exception {
Element eAction = XmlUtils.parseXml(actionXml);
Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
- SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, group,
- LOG);
+ SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
+ SlaAppType.COORDINATOR_ACTION, user, group, LOG);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
}
/* (non-Javadoc)
@@ -374,26 +370,32 @@ public class CoordRerunXCommand extends
}
@Override
- public void updateJob() throws CommandException {
- try {
- // rerun a paused coordinator job will keep job status at paused and pending at previous pending
-
- if (getPrevStatus()!= null){
- Job.Status coordJobStatus = getPrevStatus();
- if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
- coordJob.setStatus(coordJobStatus);
- }
- if (prevPending) {
- coordJob.setPending();
- } else {
- coordJob.resetPending();
- }
+ public void updateJob() {
+ if (getPrevStatus()!= null){
+ Job.Status coordJobStatus = getPrevStatus();
+ if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
+ coordJob.setStatus(coordJobStatus);
}
+ if (prevPending) {
+ coordJob.setPending();
+ } else {
+ coordJob.resetPending();
+ }
+ }
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ updateList.add(coordJob);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
+ try {
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
}
- catch (JPAExecutorException je) {
- throw new CommandException(je);
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Wed Aug 22 19:32:02 2012
@@ -31,10 +31,9 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.ResumeTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -109,18 +108,13 @@ public class CoordResumeXCommand extends
* @see org.apache.oozie.command.TransitionXCommand#updateJob()
*/
@Override
- public void updateJob() throws CommandException {
+ public void updateJob() {
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
coordJob.setSuspendedTime(null);
coordJob.setLastModifiedTime(new Date());
LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
+ coordJob.isPending());
- try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateList.add(coordJob);
}
/* (non-Javadoc)
@@ -161,12 +155,7 @@ public class CoordResumeXCommand extends
coordJob.resetPending();
LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
+ coordJob.getStatus());
- try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
- }
- catch (JPAExecutorException je) {
- LOG.error("Failed to update coordinator job : " + jobId, je);
- }
+ updateList.add(coordJob);
}
}
}
@@ -183,18 +172,26 @@ public class CoordResumeXCommand extends
}
}
- private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
- action.setStatus(CoordinatorActionBean.Status.RUNNING);
- action.incrementAndGetPending();
- action.setLastModifiedTime(new Date());
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
try {
- jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
+ jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
}
}
+ private void updateCoordAction(CoordinatorActionBean action) {
+ action.setStatus(CoordinatorActionBean.Status.RUNNING);
+ action.incrementAndGetPending();
+ action.setLastModifiedTime(new Date());
+ updateList.add(action);
+ }
+
/* (non-Javadoc)
* @see org.apache.oozie.command.TransitionXCommand#getJob()
*/
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java Wed Aug 22 19:32:02 2012
@@ -1205,4 +1205,8 @@ public class CoordSubmitXCommand extends
public Job getJob() {
return coordJob;
}
+
+ @Override
+ public void performWrites() throws CommandException {
+ }
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Wed Aug 22 19:32:02 2012
@@ -31,10 +31,9 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.SuspendTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -150,12 +149,7 @@ public class CoordSuspendXCommand extend
coordJob.resetPending();
LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
+ coordJob.getStatus());
- try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
- }
- catch (JPAExecutorException je) {
- LOG.error("Failed to update coordinator job : " + jobId, je);
- }
+ updateList.add(coordJob);
}
}
}
@@ -176,29 +170,32 @@ public class CoordSuspendXCommand extend
* @see org.apache.oozie.command.TransitionXCommand#updateJob()
*/
@Override
- public void updateJob() throws CommandException {
+ public void updateJob() {
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
coordJob.setLastModifiedTime(new Date());
coordJob.setSuspendedTime(new Date());
LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
+ updateList.add(coordJob);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
+ */
+ @Override
+ public void performWrites() throws CommandException {
try {
- jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+ jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
}
- catch (JPAExecutorException e) {
- throw new CommandException(e);
+ catch (JPAExecutorException jex) {
+ throw new CommandException(jex);
}
}
- private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
+ private void updateCoordAction(CoordinatorActionBean action) {
action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
- try {
- jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateList.add(action);
}
/* (non-Javadoc)
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -126,4 +126,8 @@ public class CoordUnpauseXCommand extend
}
+ @Override
+ public void performWrites() throws CommandException {
+ }
+
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java Wed Aug 22 19:32:02 2012
@@ -17,17 +17,10 @@
*/
package org.apache.oozie.command.coord;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.command.Command;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.service.DagXLogInfoService;
-import org.apache.oozie.service.XLogService;
import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.Store;
-import org.apache.oozie.store.StoreException;
import org.apache.oozie.store.WorkflowStore;
-import org.apache.oozie.util.XLog;
public abstract class CoordinatorCommand<T> extends Command<T, CoordinatorStore> {
@@ -35,9 +28,12 @@ public abstract class CoordinatorCommand
super(name, type, priority, logMask);
}
- public CoordinatorCommand(String name, String type, int priority, int logMask,
- boolean dryrun) {
- super(name, type, priority, logMask, (dryrun) ? false : true, dryrun);
+ public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore) {
+ super(name, type, priority, logMask, withStore);
+ }
+
+ public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
+ super(name, type, priority, logMask, (dryrun) ? false : withStore, dryrun);
}
/**
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Wed Aug 22 19:32:02 2012
@@ -18,7 +18,10 @@
package org.apache.oozie.command.wf;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
+
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
@@ -27,13 +30,14 @@ import org.apache.oozie.action.ActionExe
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowAction.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -57,6 +61,7 @@ public class ActionCheckXCommand extends
private WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
public ActionCheckXCommand(String actionId) {
this(actionId, -1);
@@ -170,17 +175,15 @@ public class ActionCheckXCommand extends
wfAction.setErrorInfo(EXEC_DATA_MISSING,
"Execution Complete, but Execution Data Missing from Action");
failJob(context);
- wfAction.setLastCheckTime(new Date());
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
- return null;
+ } else {
+ wfAction.setPending();
+ queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
}
- wfAction.setPending();
- queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
}
wfAction.setLastCheckTime(new Date());
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
}
catch (ActionExecutorException ex) {
LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
@@ -197,17 +200,18 @@ public class ActionCheckXCommand extends
break;
}
wfAction.setLastCheckTime(new Date());
+ updateList = new ArrayList<JsonBean>();
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
+ }
+ finally {
try {
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
}
- return null;
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
}
LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,11 +17,14 @@
*/
package org.apache.oozie.command.wf;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.DagELFunctions;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
@@ -33,13 +36,13 @@ import org.apache.oozie.client.WorkflowA
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -60,6 +63,8 @@ public class ActionEndXCommand extends A
private WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public ActionEndXCommand(String actionId, String type) {
super("action.end", type, 0);
@@ -169,46 +174,46 @@ public class ActionEndXCommand extends A
executor.getType());
wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
failJob(context);
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
- return null;
- }
- wfAction.setRetries(0);
- wfAction.setEndTime(new Date());
-
- boolean shouldHandleUserRetry = false;
- Status slaStatus = null;
- switch (wfAction.getStatus()) {
- case OK:
- slaStatus = Status.SUCCEEDED;
- break;
- case KILLED:
- slaStatus = Status.KILLED;
- break;
- case FAILED:
- slaStatus = Status.FAILED;
- shouldHandleUserRetry = true;
- break;
- case ERROR:
- LOG.info("ERROR is considered as FAILED for SLA");
- slaStatus = Status.KILLED;
- shouldHandleUserRetry = true;
- break;
- default:
- slaStatus = Status.FAILED;
- shouldHandleUserRetry = true;
- break;
- }
- if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
- SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
- LOG.debug(
- "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
- + ", Set pending=" + wfAction.getPending());
- queue(new SignalXCommand(jobId, actionId));
+ } else {
+ wfAction.setRetries(0);
+ wfAction.setEndTime(new Date());
+
+ boolean shouldHandleUserRetry = false;
+ Status slaStatus = null;
+ switch (wfAction.getStatus()) {
+ case OK:
+ slaStatus = Status.SUCCEEDED;
+ break;
+ case KILLED:
+ slaStatus = Status.KILLED;
+ break;
+ case FAILED:
+ slaStatus = Status.FAILED;
+ shouldHandleUserRetry = true;
+ break;
+ case ERROR:
+ LOG.info("ERROR is considered as FAILED for SLA");
+ slaStatus = Status.KILLED;
+ shouldHandleUserRetry = true;
+ break;
+ default:
+ slaStatus = Status.FAILED;
+ shouldHandleUserRetry = true;
+ break;
+ }
+ if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
+ LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
+ + ", Set pending=" + wfAction.getPending());
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
+ queue(new SignalXCommand(jobId, actionId));
+ }
}
-
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
}
catch (ActionExecutorException ex) {
LOG.warn(
@@ -243,20 +248,19 @@ public class ActionEndXCommand extends A
DagELFunctions.setActionInfo(wfInstance, wfAction);
wfJob.setWorkflowInstance(wfInstance);
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
+ }
+ finally {
try {
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
}
- catch (JPAExecutorException je) {
- throw new CommandException(je);
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
}
-
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
}
-
LOG.debug("ENDED ActionEndXCommand for action " + actionId);
return null;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Wed Aug 22 19:32:02 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,19 +17,24 @@
*/
package org.apache.oozie.command.wf;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.service.ActionService;
@@ -50,6 +55,8 @@ public class ActionKillXCommand extends
private WorkflowJobBean wfJob;
private WorkflowActionBean wfAction;
private JPAService jpaService = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public ActionKillXCommand(String actionId, String type) {
super("action.kill", type, 0);
@@ -121,11 +128,15 @@ public class ActionKillXCommand extends
wfAction.resetPending();
wfAction.setStatus(WorkflowActionBean.Status.KILLED);
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
// Add SLA status event (KILLED) for WF_ACTION
- SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
SlaAppType.WORKFLOW_ACTION);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
queue(new NotificationXCommand(wfJob, wfAction));
}
catch (ActionExecutorException ex) {
@@ -134,21 +145,25 @@ public class ActionKillXCommand extends
wfAction.setErrorInfo(ex.getErrorCode().toString(),
"KILL COMMAND FAILED - exception while executing job kill");
wfJob.setStatus(WorkflowJobBean.Status.KILLED);
- try {
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
- }
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
// What will happen to WF and COORD_ACTION, NOTIFICATION?
- SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
SlaAppType.WORKFLOW_ACTION);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]",
ex.getErrorCode(), ex.getMessage(), ex);
}
- catch (JPAExecutorException je) {
- throw new CommandException(je);
+ finally {
+ try {
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
}
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,12 +17,15 @@
*/
package org.apache.oozie.command.wf;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import javax.servlet.jsp.el.ELException;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.FaultInjection;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
@@ -34,14 +37,14 @@ import org.apache.oozie.client.WorkflowA
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -66,6 +69,8 @@ public class ActionStartXCommand extends
private WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public ActionStartXCommand(String actionId, String type) {
super("action.start", type, 0);
@@ -159,6 +164,7 @@ public class ActionStartXCommand extends
isUserRetry = true;
}
context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+ boolean caught = false;
try {
if (!(executor instanceof ControlNodeActionExecutor)) {
String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
@@ -169,79 +175,81 @@ public class ActionStartXCommand extends
}
}
catch (ELEvaluationException ex) {
+ caught = true;
throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
.getMessage(), ex);
}
catch (ELException ex) {
+ caught = true;
context.setErrorInfo(EL_ERROR, ex.getMessage());
LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
handleError(context, wfJob, wfAction);
- return null;
}
catch (org.jdom.JDOMException je) {
+ caught = true;
context.setErrorInfo("ParsingError", je.getMessage());
LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
handleError(context, wfJob, wfAction);
- return null;
}
catch (Exception ex) {
+ caught = true;
context.setErrorInfo(EL_ERROR, ex.getMessage());
LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
handleError(context, wfJob, wfAction);
- return null;
}
- wfAction.setErrorInfo(null, null);
- incrActionCounter(wfAction.getType(), 1);
-
- LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
- wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
- .getUserRetryInterval());
-
- Instrumentation.Cron cron = new Instrumentation.Cron();
- cron.start();
- context.setStartTime();
- executor.start(context, wfAction);
- cron.stop();
- FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
- addActionCron(wfAction.getType(), cron);
-
- wfAction.setRetries(0);
- if (wfAction.isExecutionComplete()) {
- if (!context.isExecuted()) {
- LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
- .getType());
- wfAction.setErrorInfo(EXEC_DATA_MISSING,
- "Execution Complete, but Execution Data Missing from Action");
- failJob(context);
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
- return null;
+ if(!caught) {
+ wfAction.setErrorInfo(null, null);
+ incrActionCounter(wfAction.getType(), 1);
+
+ LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
+ wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
+ .getUserRetryInterval());
+
+ Instrumentation.Cron cron = new Instrumentation.Cron();
+ cron.start();
+ context.setStartTime();
+ executor.start(context, wfAction);
+ cron.stop();
+ FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+ addActionCron(wfAction.getType(), cron);
+
+ wfAction.setRetries(0);
+ if (wfAction.isExecutionComplete()) {
+ if (!context.isExecuted()) {
+ LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
+ .getType());
+ wfAction.setErrorInfo(EXEC_DATA_MISSING,
+ "Execution Complete, but Execution Data Missing from Action");
+ failJob(context);
+ } else {
+ wfAction.setPending();
+ queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
+ }
}
- wfAction.setPending();
- queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
- }
- else {
- if (!context.isStarted()) {
- LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
- .getType());
- wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
- failJob(context);
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
- return null;
+ else {
+ if (!context.isStarted()) {
+ LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
+ .getType());
+ wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
+ failJob(context);
+ } else {
+ queue(new NotificationXCommand(wfJob, wfAction));
+ }
}
- queue(new NotificationXCommand(wfJob, wfAction));
- }
- LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
-
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
- // Add SLA status event (STARTED) for WF_ACTION
- SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
- SlaAppType.WORKFLOW_ACTION);
- LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
+ LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
+ // Add SLA status event (STARTED) for WF_ACTION
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
+ SlaAppType.WORKFLOW_ACTION);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
+ LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
+ }
}
catch (ActionExecutorException ex) {
LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
@@ -269,27 +277,34 @@ public class ActionStartXCommand extends
// update coordinator action
new CoordActionUpdateXCommand(wfJob, 3).call();
new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
- SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
+ SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
SlaAppType.WORKFLOW_ACTION);
- SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
+ if(slaEvent1 != null) {
+ insertList.add(slaEvent1);
+ }
+ SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
SlaAppType.WORKFLOW_JOB);
+ if(slaEvent2 != null) {
+ insertList.add(slaEvent2);
+ }
}
catch (XException x) {
LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
}
break;
}
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
+ }
+ finally {
try {
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
}
- catch (JPAExecutorException je) {
- throw new CommandException(je);
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
}
}
- catch (JPAExecutorException je) {
- throw new CommandException(je);
- }
LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
@@ -299,15 +314,19 @@ public class ActionStartXCommand extends
private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
throws CommandException {
failJob(context);
- try {
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow));
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
+ updateList.add(wfAction);
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
+ SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
+ Status.FAILED, SlaAppType.WORKFLOW_ACTION);
+ if(slaEvent1 != null) {
+ insertList.add(slaEvent1);
+ }
+ SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
+ Status.FAILED, SlaAppType.WORKFLOW_JOB);
+ if(slaEvent2 != null) {
+ insertList.add(slaEvent2);
}
- SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION);
- SLADbXOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), Status.FAILED, SlaAppType.WORKFLOW_JOB);
// update coordinator action
new CoordActionUpdateXCommand(workflow, 3).call();
new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,18 +20,19 @@ package org.apache.oozie.command.wf;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.workflow.WorkflowException;
@@ -42,6 +43,7 @@ import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.db.SLADbXOperations;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -55,6 +57,8 @@ public class KillXCommand extends Workfl
private WorkflowJobBean wfJob;
private List<WorkflowActionBean> actionList;
private JPAService jpaService = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public KillXCommand(String wfId) {
super("kill", "kill", 1);
@@ -106,7 +110,11 @@ public class KillXCommand extends Workfl
if (wfJob.getStatus() != WorkflowJob.Status.FAILED) {
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
wfJob.setStatus(WorkflowJob.Status.KILLED);
- SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.KILLED, SlaAppType.WORKFLOW_JOB);
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(),
+ Status.KILLED, SlaAppType.WORKFLOW_JOB);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
try {
wfJob.getWorkflowInstance().kill();
}
@@ -124,7 +132,7 @@ public class KillXCommand extends Workfl
action.setPending();
action.setStatus(WorkflowActionBean.Status.KILLED);
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+ updateList.add(action);
queue(new ActionKillXCommand(action.getId(), action.getType()));
}
@@ -136,16 +144,21 @@ public class KillXCommand extends Workfl
action.setStatus(WorkflowActionBean.Status.KILLED);
action.resetPending();
- SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.KILLED,
- SlaAppType.WORKFLOW_ACTION);
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
+ Status.KILLED, SlaAppType.WORKFLOW_ACTION);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
+ updateList.add(action);
}
}
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(wfJob);
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
queue(new NotificationXCommand(wfJob));
}
- catch (JPAExecutorException je) {
- throw new CommandException(je);
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
}
finally {
if(wfJob.getStatus() == WorkflowJob.Status.KILLED) {
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,25 +17,25 @@
*/
package org.apache.oozie.command.wf;
+import java.util.Collection;
import java.util.List;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkDeleteForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionsDeleteForPurgeJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
public class PurgeXCommand extends WorkflowXCommand<Void> {
private JPAService jpaService = null;
private int olderThan;
private int limit;
- private List<WorkflowJobBean> jobList = null;
+ private List<? extends JsonBean> jobList = null;
public PurgeXCommand(int olderThan, int limit) {
super("purge", "purge", 0);
@@ -49,15 +49,11 @@ public class PurgeXCommand extends Workf
int actionDeleted = 0;
if (jobList != null && jobList.size() != 0) {
- for (WorkflowJobBean w : jobList) {
- String wfId = w.getId();
- try {
- jpaService.execute(new WorkflowJobDeleteJPAExecutor(wfId));
- actionDeleted += jpaService.execute(new WorkflowActionsDeleteForPurgeJPAExecutor(wfId));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ try {
+ actionDeleted = jpaService.execute(new BulkDeleteForPurgeJPAExecutor((Collection<JsonBean>) jobList));
+ }
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
}
LOG.debug("ENDED Workflow-Purge deleted jobs :" + jobList.size() + " and actions " + actionDeleted);
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,7 +20,9 @@ package org.apache.oozie.command.wf;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -36,13 +38,13 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
@@ -77,6 +79,8 @@ public class ReRunXCommand extends Workf
private WorkflowJobBean wfBean;
private List<WorkflowActionBean> actions;
private JPAService jpaService;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private List<JsonBean> deleteList = new ArrayList<JsonBean>();
private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
@@ -166,32 +170,36 @@ public class ReRunXCommand extends Workf
throw new CommandException(ErrorCode.E0711, ex.getMessage(), ex);
}
- try {
- for (int i = 0; i < actions.size(); i++) {
- if (!nodesToSkip.contains(actions.get(i).getName())) {
- jpaService.execute(new WorkflowActionDeleteJPAExecutor(actions.get(i).getId()));
- LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
- }
- else {
- copyActionData(newWfInstance, oldWfInstance);
- }
+ for (int i = 0; i < actions.size(); i++) {
+ if (!nodesToSkip.contains(actions.get(i).getName())) {
+ deleteList.add(actions.get(i));
+ LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
}
+ else {
+ copyActionData(newWfInstance, oldWfInstance);
+ }
+ }
- wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
- wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
- wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
- wfBean.setUser(conf.get(OozieClient.USER_NAME));
- String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
- wfBean.setGroup(group);
- wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
- wfBean.setEndTime(null);
- wfBean.setRun(wfBean.getRun() + 1);
- wfBean.setStatus(WorkflowJob.Status.PREP);
- wfBean.setWorkflowInstance(newWfInstance);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
+ wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
+ wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
+ wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
+ wfBean.setUser(conf.get(OozieClient.USER_NAME));
+ String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
+ wfBean.setGroup(group);
+ wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
+ wfBean.setEndTime(null);
+ wfBean.setRun(wfBean.getRun() + 1);
+ wfBean.setStatus(WorkflowJob.Status.PREP);
+ wfBean.setWorkflowInstance(newWfInstance);
+
+ try {
+ wfBean.setLastModifiedTime(new Date());
+ updateList.add(wfBean);
+ // call JPAExecutor to do the bulk writes
+ jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
}
- catch (JPAExecutorException e) {
- throw new CommandException(e);
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
}
return null;
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,20 +17,22 @@
*/
package org.apache.oozie.command.wf;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -45,6 +47,7 @@ public class ResumeXCommand extends Work
private String id;
private JPAService jpaService = null;
private WorkflowJobBean workflow = null;
+ private List<JsonBean> updateList = new ArrayList<JsonBean>();
public ResumeXCommand(String id) {
super("resume", "resume", 1);
@@ -70,7 +73,7 @@ public class ResumeXCommand extends Work
// START_MANUAL or END_RETRY or END_MANUAL
if (action.isRetryOrManual()) {
action.setPendingOnly();
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+ updateList.add(action);
}
if (action.isPending()) {
@@ -102,7 +105,9 @@ public class ResumeXCommand extends Work
}
}
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow));
+ workflow.setLastModifiedTime(new Date());
+ updateList.add(workflow);
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
queue(new NotificationXCommand(workflow));
}
return null;
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Aug 22 19:32:02 2012
@@ -22,6 +22,7 @@ import org.apache.oozie.client.WorkflowJ
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.SLAEvent.Status;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.ErrorCode;
@@ -145,9 +146,13 @@ public class SignalXCommand extends Work
wfJob.setStartTime(new Date());
wfJob.setWorkflowInstance(workflowInstance);
// 1. Add SLA status event for WF-JOB with status STARTED
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
+ Status.STARTED, SlaAppType.WORKFLOW_JOB);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
// 2. Add SLA registration events for all WF_ACTIONS
- SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, Status.STARTED, SlaAppType.WORKFLOW_JOB);
- writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
+ createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
.getGroup(), wfJob.getConf());
queue(new NotificationXCommand(wfJob));
}
@@ -195,8 +200,11 @@ public class SignalXCommand extends Work
actionToFail.resetPending();
actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
queue(new NotificationXCommand(wfJob, actionToFail));
- SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
- SlaAppType.WORKFLOW_ACTION);
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
+ Status.FAILED, SlaAppType.WORKFLOW_ACTION);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
updateList.add(actionToFail);
}
}
@@ -221,7 +229,11 @@ public class SignalXCommand extends Work
default: // TODO SUSPENDED
break;
}
- SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, slaStatus, SlaAppType.WORKFLOW_JOB);
+ SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
+ slaStatus, SlaAppType.WORKFLOW_JOB);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
queue(new NotificationXCommand(wfJob));
if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
@@ -354,7 +366,7 @@ public class SignalXCommand extends Work
}
@SuppressWarnings("unchecked")
- private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
+ private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
throws CommandException {
try {
Element eWfJob = XmlUtils.parseXml(wfXml);
@@ -366,7 +378,11 @@ public class SignalXCommand extends Work
eSla = XmlUtils.parseXml(slaXml);
String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
action.getAttributeValue("name") + "");
- SLADbXOperations.writeSlaRegistrationEvent(eSla, actionId, SlaAppType.WORKFLOW_ACTION, user, group);
+ SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
+ SlaAppType.WORKFLOW_ACTION, user, group);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
}
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,6 +20,7 @@ package org.apache.oozie.command.wf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.service.HadoopAccessorException;
@@ -37,7 +38,8 @@ import org.apache.oozie.util.ParamChecke
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.store.StoreException;
@@ -53,9 +55,11 @@ import org.apache.oozie.service.SchemaSe
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
import org.jdom.Element;
import org.jdom.Namespace;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -69,6 +73,7 @@ public class SubmitXCommand extends Work
private Configuration conf;
private String authToken;
+ private List<JsonBean> insertList = new ArrayList<JsonBean>();
public SubmitXCommand(Configuration conf, String authToken) {
super("submit", "submit", 1);
@@ -178,9 +183,15 @@ public class SubmitXCommand extends Work
// System.out.println("SlaXml :"+ slaXml);
//store.insertWorkflow(workflow);
+ insertList.add(workflow);
JPAService jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
- jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
+ try {
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
+ }
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
+ }
}
else {
LOG.error(ErrorCode.E0610);
@@ -223,7 +234,11 @@ public class SubmitXCommand extends Work
try {
if (slaXml != null && slaXml.length() > 0) {
Element eSla = XmlUtils.parseXml(slaXml);
- SLADbOperations.writeSlaRegistrationEvent(eSla, id, SlaAppType.WORKFLOW_JOB, user, group, log);
+ SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, id,
+ SlaAppType.WORKFLOW_JOB, user, group, log);
+ if(slaEvent != null) {
+ insertList.add(slaEvent);
+ }
}
}
catch (Exception e) {
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,20 +17,22 @@
*/
package org.apache.oozie.command.wf;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -44,6 +46,7 @@ public class SuspendXCommand extends Wor
private final String wfid;
private WorkflowJobBean wfJobBean;
private JPAService jpaService;
+ private static List<JsonBean> updateList = new ArrayList<JsonBean>();
public SuspendXCommand(String id) {
super("suspend", "suspend", 1);
@@ -58,7 +61,9 @@ public class SuspendXCommand extends Wor
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
try {
suspendJob(this.jpaService, this.wfJobBean, this.wfid, null);
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(this.wfJobBean));
+ this.wfJobBean.setLastModifiedTime(new Date());
+ updateList.add(this.wfJobBean);
+ jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
queue(new NotificationXCommand(this.wfJobBean));
}
catch (WorkflowException e) {
@@ -121,7 +126,7 @@ public class SuspendXCommand extends Wor
else {
action.resetPendingOnly();
}
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+ updateList.add(action);
}
}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java?rev=1376204&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java Wed Aug 22 19:32:02 2012
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Delete job, its list of actions and return the number of
+ * actions been deleted.
+ */
+public class BulkDeleteForPurgeJPAExecutor implements JPAExecutor<Integer> {
+
+ private Collection<JsonBean> deleteList;
+
+ /**
+ * Initialize the JPAExecutor using the delete list of JSON beans
+ * @param deleteList
+ */
+ public BulkDeleteForPurgeJPAExecutor(Collection<JsonBean> deleteList) {
+ this.deleteList = deleteList;
+ }
+
+ public BulkDeleteForPurgeJPAExecutor() {
+ }
+
+ /**
+ * Sets the delete list for JSON bean
+ *
+ * @param deleteList
+ */
+ public void setDeleteList(Collection<JsonBean> deleteList) {
+ this.deleteList = deleteList;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ @Override
+ public String getName() {
+ return "BulkDeleteForPurgeJPAExecutor";
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public Integer execute(EntityManager em) throws JPAExecutorException {
+ int actionsDeleted = 0;
+ try {
+ // Only used by test cases to check for rollback of transaction
+ FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+ if (deleteList != null) {
+ for (JsonBean entity : deleteList) {
+ ParamChecker.notNull(entity, "JsonBean");
+ // deleting the job (wf/coord/bundle)
+ em.remove(em.merge(entity));
+ if (entity instanceof WorkflowJobBean) {
+ // deleting the workflow actions for this job
+ Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
+ g.setParameter("wfId", ((WorkflowJobBean) entity).getId());
+ actionsDeleted = g.executeUpdate();
+ }
+ else if (entity instanceof CoordinatorJobBean) {
+ // deleting the coord actions for this job
+ Query g = em.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
+ g.setParameter("jobId", ((CoordinatorJobBean) entity).getId());
+ actionsDeleted = g.executeUpdate();
+ }
+ else if (entity instanceof BundleJobBean) {
+ // deleting the bundle actions for this job
+ Query g = em.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_BUNDLE");
+ g.setParameter("bundleId", ((BundleJobBean) entity).getId());
+ actionsDeleted = g.executeUpdate();
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e);
+ }
+ return actionsDeleted;
+ }
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java?rev=1376204&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java Wed Aug 22 19:32:02 2012
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Class for updating and deleting beans in bulk
+ */
+public class BulkUpdateDeleteJPAExecutor implements JPAExecutor<Void> {
+
+ private Collection<JsonBean> updateList;
+ private Collection<JsonBean> deleteList;
+ private boolean forRerun = true;
+
+ /**
+ * Initialize the JPAExecutor using the update and delete list of JSON beans
+ * @param deleteList
+ * @param updateList
+ */
+ public BulkUpdateDeleteJPAExecutor(Collection<JsonBean> updateList, Collection<JsonBean> deleteList,
+ boolean forRerun) {
+ this.updateList = updateList;
+ this.deleteList = deleteList;
+ this.forRerun = forRerun;
+ }
+
+ public BulkUpdateDeleteJPAExecutor() {
+ }
+
+ /**
+ * Sets the update list for JSON bean
+ *
+ * @param updateList
+ */
+ public void setUpdateList(Collection<JsonBean> updateList) {
+ this.updateList = updateList;
+ }
+
+ /**
+ * Sets the delete list for JSON bean
+ *
+ * @param deleteList
+ */
+ public void setDeleteList(Collection<JsonBean> deleteList) {
+ this.deleteList = deleteList;
+ }
+
+ /**
+ * Sets whether for RerunX command or no. Else it'd be for ChangeX
+ *
+ * @param forRerun
+ */
+ public void setForRerun(boolean forRerun) {
+ this.forRerun = forRerun;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ @Override
+ public String getName() {
+ return "BulkUpdateDeleteJPAExecutor";
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public Void execute(EntityManager em) throws JPAExecutorException {
+ try {
+ if (updateList != null) {
+ for (JsonBean entity : updateList) {
+ ParamChecker.notNull(entity, "JsonBean");
+ em.merge(entity);
+ }
+ }
+ // Only used by test cases to check for rollback of transaction
+ FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+ if (deleteList != null) {
+ for (JsonBean entity : deleteList) {
+ ParamChecker.notNull(entity, "JsonBean");
+ if (forRerun) {
+ em.remove(em.merge(entity));
+ }
+ else {
+ Query g = em.createNamedQuery("DELETE_UNSCHEDULED_ACTION");
+ String coordActionId = ((CoordinatorActionBean) entity).getId();
+ g.setParameter("id", coordActionId);
+ int actionsDeleted = g.executeUpdate();
+ if (actionsDeleted == 0)
+ throw new JPAExecutorException(ErrorCode.E1022, coordActionId);
+ }
+ }
+ }
+ return null;
+ }
+ catch (JPAExecutorException je) {
+ throw je;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e);
+ }
+ }
+}