You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/22 21:32:05 UTC

svn commit: r1376204 [2/4] - in /incubator/oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src...

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Wed Aug 22 19:32:02 2012
@@ -19,18 +19,15 @@ package org.apache.oozie.command.coord;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorActionInfo;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.action.hadoop.FsActionExecutor;
@@ -45,15 +42,11 @@ import org.apache.oozie.command.RerunTra
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.CoordUtils;
-import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.InstrumentUtils;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.ParamChecker;
@@ -224,7 +217,7 @@ public class CoordRerunXCommand extends 
         coordAction.setExternalStatus("");
         coordAction.setRerunTime(new Date());
         coordAction.setLastModifiedTime(new Date());
-        jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
+        updateList.add(coordAction);
         writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
     }
 
@@ -241,8 +234,11 @@ public class CoordRerunXCommand extends 
             throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml);
         Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
-        SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, group,
-                LOG);
+        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
+                SlaAppType.COORDINATOR_ACTION, user, group, LOG);
+        if(slaEvent != null) {
+            insertList.add(slaEvent);
+        }
     }
 
     /* (non-Javadoc)
@@ -374,26 +370,32 @@ public class CoordRerunXCommand extends 
     }
 
     @Override
-    public void updateJob() throws CommandException {
-        try {
-            // rerun a paused coordinator job will keep job status at paused and pending at previous pending
-
-            if (getPrevStatus()!= null){
-                Job.Status coordJobStatus = getPrevStatus();
-                if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
-                    coordJob.setStatus(coordJobStatus);
-                }
-                if (prevPending) {
-                    coordJob.setPending();
-                } else {
-                    coordJob.resetPending();
-                }
+    public void updateJob() {
+        if (getPrevStatus()!= null){
+            Job.Status coordJobStatus = getPrevStatus();
+            if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
+                coordJob.setStatus(coordJobStatus);
             }
+            if (prevPending) {
+                coordJob.setPending();
+            } else {
+                coordJob.resetPending();
+            }
+        }
 
-            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        updateList.add(coordJob);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
+        try {
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
         }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
         }
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Wed Aug 22 19:32:02 2012
@@ -31,10 +31,9 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.command.ResumeTransitionXCommand;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.command.wf.ResumeXCommand;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -109,18 +108,13 @@ public class CoordResumeXCommand extends
      * @see org.apache.oozie.command.TransitionXCommand#updateJob()
      */
     @Override
-    public void updateJob() throws CommandException {
+    public void updateJob() {
         InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
         coordJob.setSuspendedTime(null);
         coordJob.setLastModifiedTime(new Date());
         LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
                 + coordJob.isPending());
-        try {
-            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
-        }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
-        }
+        updateList.add(coordJob);
     }
 
     /* (non-Javadoc)
@@ -161,12 +155,7 @@ public class CoordResumeXCommand extends
                 coordJob.resetPending();
                 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
                         + coordJob.getStatus());
-                try {
-                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
-                }
-                catch (JPAExecutorException je) {
-                    LOG.error("Failed to update coordinator job : " + jobId, je);
-                }
+                updateList.add(coordJob);
             }
         }
     }
@@ -183,18 +172,26 @@ public class CoordResumeXCommand extends
         }
     }
 
-    private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
-        action.setStatus(CoordinatorActionBean.Status.RUNNING);
-        action.incrementAndGetPending();
-        action.setLastModifiedTime(new Date());
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
         try {
-            jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
+            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);
         }
     }
 
+    private void updateCoordAction(CoordinatorActionBean action) {
+        action.setStatus(CoordinatorActionBean.Status.RUNNING);
+        action.incrementAndGetPending();
+        action.setLastModifiedTime(new Date());
+        updateList.add(action);
+    }
+
     /* (non-Javadoc)
      * @see org.apache.oozie.command.TransitionXCommand#getJob()
      */

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java Wed Aug 22 19:32:02 2012
@@ -1205,4 +1205,8 @@ public class CoordSubmitXCommand extends
     public Job getJob() {
         return coordJob;
     }
+
+    @Override
+    public void performWrites() throws CommandException {
+    }
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Wed Aug 22 19:32:02 2012
@@ -31,10 +31,9 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.command.SuspendTransitionXCommand;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -150,12 +149,7 @@ public class CoordSuspendXCommand extend
                 coordJob.resetPending();
                 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
                         + coordJob.getStatus());
-                try {
-                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
-                }
-                catch (JPAExecutorException je) {
-                    LOG.error("Failed to update coordinator job : " + jobId, je);
-                }
+                updateList.add(coordJob);
             }
         }
     }
@@ -176,29 +170,32 @@ public class CoordSuspendXCommand extend
      * @see org.apache.oozie.command.TransitionXCommand#updateJob()
      */
     @Override
-    public void updateJob() throws CommandException {
+    public void updateJob() {
         InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
         coordJob.setLastModifiedTime(new Date());
         coordJob.setSuspendedTime(new Date());
         LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
+        updateList.add(coordJob);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
         try {
-            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
         }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
+        catch (JPAExecutorException jex) {
+            throw new CommandException(jex);
         }
     }
 
-    private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
+    private void updateCoordAction(CoordinatorActionBean action) {
         action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
         action.incrementAndGetPending();
         action.setLastModifiedTime(new Date());
-        try {
-            jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
-        }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
-        }
+        updateList.add(action);
     }
 
     /* (non-Javadoc)

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -126,4 +126,8 @@ public class CoordUnpauseXCommand extend
 
     }
 
+    @Override
+    public void performWrites() throws CommandException {
+    }
+
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java Wed Aug 22 19:32:02 2012
@@ -17,17 +17,10 @@
  */
 package org.apache.oozie.command.coord;
 
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.command.Command;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.service.DagXLogInfoService;
-import org.apache.oozie.service.XLogService;
 import org.apache.oozie.store.CoordinatorStore;
 import org.apache.oozie.store.Store;
-import org.apache.oozie.store.StoreException;
 import org.apache.oozie.store.WorkflowStore;
-import org.apache.oozie.util.XLog;
 
 public abstract class CoordinatorCommand<T> extends Command<T, CoordinatorStore> {
 
@@ -35,9 +28,12 @@ public abstract class CoordinatorCommand
         super(name, type, priority, logMask);
     }
 
-    public CoordinatorCommand(String name, String type, int priority, int logMask,
-                              boolean dryrun) {
-        super(name, type, priority, logMask, (dryrun) ? false : true, dryrun);
+    public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore) {
+        super(name, type, priority, logMask, withStore);
+    }
+
+    public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
+        super(name, type, priority, logMask, (dryrun) ? false : withStore, dryrun);
     }
 
     /**

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Wed Aug 22 19:32:02 2012
@@ -18,7 +18,10 @@
 package org.apache.oozie.command.wf;
 
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
+
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
@@ -27,13 +30,14 @@ import org.apache.oozie.action.ActionExe
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.WorkflowAction.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -57,6 +61,7 @@ public class ActionCheckXCommand extends
     private WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
 
     public ActionCheckXCommand(String actionId) {
         this(actionId, -1);
@@ -170,17 +175,15 @@ public class ActionCheckXCommand extends
                     wfAction.setErrorInfo(EXEC_DATA_MISSING,
                             "Execution Complete, but Execution Data Missing from Action");
                     failJob(context);
-                    wfAction.setLastCheckTime(new Date());
-                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
-                    return null;
+                } else {
+                    wfAction.setPending();
+                    queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
                 }
-                wfAction.setPending();
-                queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
             }
             wfAction.setLastCheckTime(new Date());
-            jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+            updateList.add(wfAction);
+            wfJob.setLastModifiedTime(new Date());
+            updateList.add(wfJob);
         }
         catch (ActionExecutorException ex) {
             LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
@@ -197,17 +200,18 @@ public class ActionCheckXCommand extends
                     break;
             }
             wfAction.setLastCheckTime(new Date());
+            updateList = new ArrayList<JsonBean>();
+            updateList.add(wfAction);
+            wfJob.setLastModifiedTime(new Date());
+            updateList.add(wfJob);
+        }
+        finally {
             try {
-                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
             }
             catch (JPAExecutorException e) {
                 throw new CommandException(e);
             }
-            return null;
-        }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
         }
 
         LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,11 +17,14 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.DagELFunctions;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
@@ -33,13 +36,13 @@ import org.apache.oozie.client.WorkflowA
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -60,6 +63,8 @@ public class ActionEndXCommand extends A
     private WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public ActionEndXCommand(String actionId, String type) {
         super("action.end", type, 0);
@@ -169,46 +174,46 @@ public class ActionEndXCommand extends A
                         executor.getType());
                 wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
                 failJob(context);
-                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
-                return null;
-            }
-            wfAction.setRetries(0);
-            wfAction.setEndTime(new Date());
-
-            boolean shouldHandleUserRetry = false;
-            Status slaStatus = null;
-            switch (wfAction.getStatus()) {
-                case OK:
-                    slaStatus = Status.SUCCEEDED;
-                    break;
-                case KILLED:
-                    slaStatus = Status.KILLED;
-                    break;
-                case FAILED:
-                    slaStatus = Status.FAILED;
-                    shouldHandleUserRetry = true;
-                    break;
-                case ERROR:
-                    LOG.info("ERROR is considered as FAILED for SLA");
-                    slaStatus = Status.KILLED;
-                    shouldHandleUserRetry = true;
-                    break;
-                default:
-                    slaStatus = Status.FAILED;
-                    shouldHandleUserRetry = true;
-                    break;
-            }
-            if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
-                SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
-                LOG.debug(
-                        "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
-                        + ", Set pending=" + wfAction.getPending());
-                queue(new SignalXCommand(jobId, actionId));
+            } else {
+                wfAction.setRetries(0);
+                wfAction.setEndTime(new Date());
+    
+                boolean shouldHandleUserRetry = false;
+                Status slaStatus = null;
+                switch (wfAction.getStatus()) {
+                    case OK:
+                        slaStatus = Status.SUCCEEDED;
+                        break;
+                    case KILLED:
+                        slaStatus = Status.KILLED;
+                        break;
+                    case FAILED:
+                        slaStatus = Status.FAILED;
+                        shouldHandleUserRetry = true;
+                        break;
+                    case ERROR:
+                        LOG.info("ERROR is considered as FAILED for SLA");
+                        slaStatus = Status.KILLED;
+                        shouldHandleUserRetry = true;
+                        break;
+                    default:
+                        slaStatus = Status.FAILED;
+                        shouldHandleUserRetry = true;
+                        break;
+                }
+                if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
+                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
+                    LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
+                            + ", Set pending=" + wfAction.getPending());
+                    if(slaEvent != null) {
+                        insertList.add(slaEvent);
+                    }
+                    queue(new SignalXCommand(jobId, actionId));
+                }
             }
-
-            jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+            updateList.add(wfAction);
+            wfJob.setLastModifiedTime(new Date());
+            updateList.add(wfJob);
         }
         catch (ActionExecutorException ex) {
             LOG.warn(
@@ -243,20 +248,19 @@ public class ActionEndXCommand extends A
             DagELFunctions.setActionInfo(wfInstance, wfAction);
             wfJob.setWorkflowInstance(wfInstance);
 
+            updateList.add(wfAction);
+            wfJob.setLastModifiedTime(new Date());
+            updateList.add(wfJob);
+        }
+        finally {
             try {
-                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             }
-            catch (JPAExecutorException je) {
-                throw new CommandException(je);
+            catch (JPAExecutorException e) {
+                throw new CommandException(e);
             }
-
-        }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
         }
 
-
         LOG.debug("ENDED ActionEndXCommand for action " + actionId);
         return null;
     }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Wed Aug 22 19:32:02 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,19 +17,24 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.service.ActionService;
@@ -50,6 +55,8 @@ public class ActionKillXCommand extends 
     private WorkflowJobBean wfJob;
     private WorkflowActionBean wfAction;
     private JPAService jpaService = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public ActionKillXCommand(String actionId, String type) {
         super("action.kill", type, 0);
@@ -121,11 +128,15 @@ public class ActionKillXCommand extends 
                     wfAction.resetPending();
                     wfAction.setStatus(WorkflowActionBean.Status.KILLED);
 
-                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+                    updateList.add(wfAction);
+                    wfJob.setLastModifiedTime(new Date());
+                    updateList.add(wfJob);
                     // Add SLA status event (KILLED) for WF_ACTION
-                    SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
+                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
                             SlaAppType.WORKFLOW_ACTION);
+                    if(slaEvent != null) {
+                        insertList.add(slaEvent);
+                    }
                     queue(new NotificationXCommand(wfJob, wfAction));
                 }
                 catch (ActionExecutorException ex) {
@@ -134,21 +145,25 @@ public class ActionKillXCommand extends 
                     wfAction.setErrorInfo(ex.getErrorCode().toString(),
                             "KILL COMMAND FAILED - exception while executing job kill");
                     wfJob.setStatus(WorkflowJobBean.Status.KILLED);
-                    try {
-                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
-                    }
-                    catch (JPAExecutorException je) {
-                        throw new CommandException(je);
-                    }
+                    updateList.add(wfAction);
+                    wfJob.setLastModifiedTime(new Date());
+                    updateList.add(wfJob);
                     // What will happen to WF and COORD_ACTION, NOTIFICATION?
-                    SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
+                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
                             SlaAppType.WORKFLOW_ACTION);
+                    if(slaEvent != null) {
+                        insertList.add(slaEvent);
+                    }
                     LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]",
                             ex.getErrorCode(), ex.getMessage(), ex);
                 }
-                catch (JPAExecutorException je) {
-                    throw new CommandException(je);
+                finally {
+                    try {
+                        jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+                    }
+                    catch (JPAExecutorException e) {
+                        throw new CommandException(e);
+                    }
                 }
             }
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,12 +17,15 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import javax.servlet.jsp.el.ELException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.FaultInjection;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
@@ -34,14 +37,14 @@ import org.apache.oozie.client.WorkflowA
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -66,6 +69,8 @@ public class ActionStartXCommand extends
     private WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public ActionStartXCommand(String actionId, String type) {
         super("action.start", type, 0);
@@ -159,6 +164,7 @@ public class ActionStartXCommand extends
                 isUserRetry = true;
             }
             context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+            boolean caught = false;
             try {
                 if (!(executor instanceof ControlNodeActionExecutor)) {
                     String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
@@ -169,79 +175,81 @@ public class ActionStartXCommand extends
                 }
             }
             catch (ELEvaluationException ex) {
+                caught = true;
                 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
                         .getMessage(), ex);
             }
             catch (ELException ex) {
+                caught = true;
                 context.setErrorInfo(EL_ERROR, ex.getMessage());
                 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
                 handleError(context, wfJob, wfAction);
-                return null;
             }
             catch (org.jdom.JDOMException je) {
+                caught = true;
                 context.setErrorInfo("ParsingError", je.getMessage());
                 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
                 handleError(context, wfJob, wfAction);
-                return null;
             }
             catch (Exception ex) {
+                caught = true;
                 context.setErrorInfo(EL_ERROR, ex.getMessage());
                 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
                 handleError(context, wfJob, wfAction);
-                return null;
             }
-            wfAction.setErrorInfo(null, null);
-            incrActionCounter(wfAction.getType(), 1);
-
-            LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
-                            wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
-                                    .getUserRetryInterval());
-
-            Instrumentation.Cron cron = new Instrumentation.Cron();
-            cron.start();
-            context.setStartTime();
-            executor.start(context, wfAction);
-            cron.stop();
-            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
-            addActionCron(wfAction.getType(), cron);
-
-            wfAction.setRetries(0);
-            if (wfAction.isExecutionComplete()) {
-                if (!context.isExecuted()) {
-                    LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
-                            .getType());
-                    wfAction.setErrorInfo(EXEC_DATA_MISSING,
-                            "Execution Complete, but Execution Data Missing from Action");
-                    failJob(context);
-                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
-                    return null;
+            if(!caught) {
+                wfAction.setErrorInfo(null, null);
+                incrActionCounter(wfAction.getType(), 1);
+
+                LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
+                                wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
+                                        .getUserRetryInterval());
+
+                Instrumentation.Cron cron = new Instrumentation.Cron();
+                cron.start();
+                context.setStartTime();
+                executor.start(context, wfAction);
+                cron.stop();
+                FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+                addActionCron(wfAction.getType(), cron);
+
+                wfAction.setRetries(0);
+                if (wfAction.isExecutionComplete()) {
+                    if (!context.isExecuted()) {
+                        LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
+                                .getType());
+                        wfAction.setErrorInfo(EXEC_DATA_MISSING,
+                                "Execution Complete, but Execution Data Missing from Action");
+                        failJob(context);
+                    } else {
+                        wfAction.setPending();
+                        queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
+                    }
                 }
-                wfAction.setPending();
-                queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
-            }
-            else {
-                if (!context.isStarted()) {
-                    LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
-                            .getType());
-                    wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
-                    failJob(context);
-                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
-                    return null;
+                else {
+                    if (!context.isStarted()) {
+                        LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
+                                .getType());
+                        wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
+                        failJob(context);
+                    } else {
+                        queue(new NotificationXCommand(wfJob, wfAction));
+                    }
                 }
-                queue(new NotificationXCommand(wfJob, wfAction));
-            }
 
-            LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
-
-            jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
-            // Add SLA status event (STARTED) for WF_ACTION
-            SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
-                    SlaAppType.WORKFLOW_ACTION);
-            LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
+                LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
 
+                updateList.add(wfAction);
+                wfJob.setLastModifiedTime(new Date());
+                updateList.add(wfJob);
+                // Add SLA status event (STARTED) for WF_ACTION
+                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
+                        SlaAppType.WORKFLOW_ACTION);
+                if(slaEvent != null) {
+                    insertList.add(slaEvent);
+                }
+                LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
+            }
         }
         catch (ActionExecutorException ex) {
             LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
@@ -269,27 +277,34 @@ public class ActionStartXCommand extends
                         // update coordinator action
                         new CoordActionUpdateXCommand(wfJob, 3).call();
                         new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
-                        SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
+                        SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
                                 SlaAppType.WORKFLOW_ACTION);
-                        SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
+                        if(slaEvent1 != null) {
+                            insertList.add(slaEvent1);
+                        }
+                        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
                                 SlaAppType.WORKFLOW_JOB);
+                        if(slaEvent2 != null) {
+                            insertList.add(slaEvent2);
+                        }
                     }
                     catch (XException x) {
                         LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
                     }
                     break;
             }
+            updateList.add(wfAction);
+            wfJob.setLastModifiedTime(new Date());
+            updateList.add(wfJob);
+        }
+        finally {
             try {
-                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
-                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             }
-            catch (JPAExecutorException je) {
-                throw new CommandException(je);
+            catch (JPAExecutorException e) {
+                throw new CommandException(e);
             }
         }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
-        }
 
         LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
 
@@ -299,15 +314,19 @@ public class ActionStartXCommand extends
     private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
             throws CommandException {
         failJob(context);
-        try {
-            jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow));
-        }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
+        updateList.add(wfAction);
+        wfJob.setLastModifiedTime(new Date());
+        updateList.add(wfJob);
+        SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
+                Status.FAILED, SlaAppType.WORKFLOW_ACTION);
+        if(slaEvent1 != null) {
+            insertList.add(slaEvent1);
+        }
+        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
+                Status.FAILED, SlaAppType.WORKFLOW_JOB);
+        if(slaEvent2 != null) {
+            insertList.add(slaEvent2);
         }
-        SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION);
-        SLADbXOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), Status.FAILED, SlaAppType.WORKFLOW_JOB);
         // update coordinator action
         new CoordActionUpdateXCommand(workflow, 3).call();
         new WfEndXCommand(wfJob).call(); //To delete the WF temp dir

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,18 +20,19 @@ package org.apache.oozie.command.wf;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.workflow.WorkflowException;
@@ -42,6 +43,7 @@ import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.db.SLADbXOperations;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -55,6 +57,8 @@ public class KillXCommand extends Workfl
     private WorkflowJobBean wfJob;
     private List<WorkflowActionBean> actionList;
     private JPAService jpaService = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public KillXCommand(String wfId) {
         super("kill", "kill", 1);
@@ -106,7 +110,11 @@ public class KillXCommand extends Workfl
         if (wfJob.getStatus() != WorkflowJob.Status.FAILED) {
             InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
             wfJob.setStatus(WorkflowJob.Status.KILLED);
-            SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.KILLED, SlaAppType.WORKFLOW_JOB);
+            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(),
+                    Status.KILLED, SlaAppType.WORKFLOW_JOB);
+            if(slaEvent != null) {
+                insertList.add(slaEvent);
+            }
             try {
                 wfJob.getWorkflowInstance().kill();
             }
@@ -124,7 +132,7 @@ public class KillXCommand extends Workfl
                     action.setPending();
                     action.setStatus(WorkflowActionBean.Status.KILLED);
 
-                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+                    updateList.add(action);
 
                     queue(new ActionKillXCommand(action.getId(), action.getType()));
                 }
@@ -136,16 +144,21 @@ public class KillXCommand extends Workfl
 
                     action.setStatus(WorkflowActionBean.Status.KILLED);
                     action.resetPending();
-                    SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.KILLED,
-                            SlaAppType.WORKFLOW_ACTION);
-                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
+                            Status.KILLED, SlaAppType.WORKFLOW_ACTION);
+                    if(slaEvent != null) {
+                        insertList.add(slaEvent);
+                    }
+                    updateList.add(action);
                 }
             }
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+            wfJob.setLastModifiedTime(new Date());
+            updateList.add(wfJob);
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             queue(new NotificationXCommand(wfJob));
         }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
         }
         finally {
             if(wfJob.getStatus() == WorkflowJob.Status.KILLED) {

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,25 +17,25 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkDeleteForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionsDeleteForPurgeJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
 
 public class PurgeXCommand extends WorkflowXCommand<Void> {
     private JPAService jpaService = null;
     private int olderThan;
     private int limit;
-    private List<WorkflowJobBean> jobList = null;
+    private List<? extends JsonBean> jobList = null;
 
     public PurgeXCommand(int olderThan, int limit) {
         super("purge", "purge", 0);
@@ -49,15 +49,11 @@ public class PurgeXCommand extends Workf
 
         int actionDeleted = 0;
         if (jobList != null && jobList.size() != 0) {
-            for (WorkflowJobBean w : jobList) {
-                String wfId = w.getId();
-                try {
-                    jpaService.execute(new WorkflowJobDeleteJPAExecutor(wfId));
-                    actionDeleted += jpaService.execute(new WorkflowActionsDeleteForPurgeJPAExecutor(wfId));
-                }
-                catch (JPAExecutorException e) {
-                    throw new CommandException(e);
-                }
+            try {
+                actionDeleted = jpaService.execute(new BulkDeleteForPurgeJPAExecutor((Collection<JsonBean>) jobList));
+            }
+            catch (JPAExecutorException je) {
+                throw new CommandException(je);
             }
             LOG.debug("ENDED Workflow-Purge deleted jobs :" + jobList.size() + " and actions " + actionDeleted);
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,7 +20,9 @@ package org.apache.oozie.command.wf;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -36,13 +38,13 @@ import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -77,6 +79,8 @@ public class ReRunXCommand extends Workf
     private WorkflowJobBean wfBean;
     private List<WorkflowActionBean> actions;
     private JPAService jpaService;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> deleteList = new ArrayList<JsonBean>();
 
     private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
     private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
@@ -166,32 +170,36 @@ public class ReRunXCommand extends Workf
             throw new CommandException(ErrorCode.E0711, ex.getMessage(), ex);
         }
 
-        try {
-            for (int i = 0; i < actions.size(); i++) {
-                if (!nodesToSkip.contains(actions.get(i).getName())) {
-                    jpaService.execute(new WorkflowActionDeleteJPAExecutor(actions.get(i).getId()));
-                    LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
-                }
-                else {
-                    copyActionData(newWfInstance, oldWfInstance);
-                }
+        for (int i = 0; i < actions.size(); i++) {
+            if (!nodesToSkip.contains(actions.get(i).getName())) {
+                deleteList.add(actions.get(i));
+                LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
             }
+            else {
+                copyActionData(newWfInstance, oldWfInstance);
+            }
+        }
 
-            wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
-            wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
-            wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
-            wfBean.setUser(conf.get(OozieClient.USER_NAME));
-            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
-            wfBean.setGroup(group);
-            wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
-            wfBean.setEndTime(null);
-            wfBean.setRun(wfBean.getRun() + 1);
-            wfBean.setStatus(WorkflowJob.Status.PREP);
-            wfBean.setWorkflowInstance(newWfInstance);
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
+        wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
+        wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
+        wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
+        wfBean.setUser(conf.get(OozieClient.USER_NAME));
+        String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
+        wfBean.setGroup(group);
+        wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
+        wfBean.setEndTime(null);
+        wfBean.setRun(wfBean.getRun() + 1);
+        wfBean.setStatus(WorkflowJob.Status.PREP);
+        wfBean.setWorkflowInstance(newWfInstance);
+
+        try {
+            wfBean.setLastModifiedTime(new Date());
+            updateList.add(wfBean);
+            // call JPAExecutor to do the bulk writes
+            jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
         }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
+        catch (JPAExecutorException je) {
+            throw new CommandException(je);
         }
 
         return null;

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,20 +17,22 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -45,6 +47,7 @@ public class ResumeXCommand extends Work
     private String id;
     private JPAService jpaService = null;
     private WorkflowJobBean workflow = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
 
     public ResumeXCommand(String id) {
         super("resume", "resume", 1);
@@ -70,7 +73,7 @@ public class ResumeXCommand extends Work
                     // START_MANUAL or END_RETRY or END_MANUAL
                     if (action.isRetryOrManual()) {
                         action.setPendingOnly();
-                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+                        updateList.add(action);
                     }
 
                     if (action.isPending()) {
@@ -102,7 +105,9 @@ public class ResumeXCommand extends Work
                     }
                 }
 
-                jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow));
+                workflow.setLastModifiedTime(new Date());
+                updateList.add(workflow);
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
                 queue(new NotificationXCommand(workflow));
             }
             return null;

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Aug 22 19:32:02 2012
@@ -22,6 +22,7 @@ import org.apache.oozie.client.WorkflowJ
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.ErrorCode;
@@ -145,9 +146,13 @@ public class SignalXCommand extends Work
                 wfJob.setStartTime(new Date());
                 wfJob.setWorkflowInstance(workflowInstance);
                 // 1. Add SLA status event for WF-JOB with status STARTED
+                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
+                        Status.STARTED, SlaAppType.WORKFLOW_JOB);
+                if(slaEvent != null) {
+                    insertList.add(slaEvent);
+                }
                 // 2. Add SLA registration events for all WF_ACTIONS
-                SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, Status.STARTED, SlaAppType.WORKFLOW_JOB);
-                writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
+                createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
                         .getGroup(), wfJob.getConf());
                 queue(new NotificationXCommand(wfJob));
             }
@@ -195,8 +200,11 @@ public class SignalXCommand extends Work
                     actionToFail.resetPending();
                     actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
                     queue(new NotificationXCommand(wfJob, actionToFail));
-                    SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
-                            SlaAppType.WORKFLOW_ACTION);
+                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
+                            Status.FAILED, SlaAppType.WORKFLOW_ACTION);
+                    if(slaEvent != null) {
+                        insertList.add(slaEvent);
+                    }
                     updateList.add(actionToFail);
                 }
             }
@@ -221,7 +229,11 @@ public class SignalXCommand extends Work
                 default: // TODO SUSPENDED
                     break;
             }
-            SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, slaStatus, SlaAppType.WORKFLOW_JOB);
+            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
+                    slaStatus, SlaAppType.WORKFLOW_JOB);
+            if(slaEvent != null) {
+                insertList.add(slaEvent);
+            }
             queue(new NotificationXCommand(wfJob));
             if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
                 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
@@ -354,7 +366,7 @@ public class SignalXCommand extends Work
     }
 
     @SuppressWarnings("unchecked")
-    private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
+    private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
             throws CommandException {
         try {
             Element eWfJob = XmlUtils.parseXml(wfXml);
@@ -366,7 +378,11 @@ public class SignalXCommand extends Work
                     eSla = XmlUtils.parseXml(slaXml);
                     String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
                             action.getAttributeValue("name") + "");
-                    SLADbXOperations.writeSlaRegistrationEvent(eSla, actionId, SlaAppType.WORKFLOW_ACTION, user, group);
+                    SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
+                            SlaAppType.WORKFLOW_ACTION, user, group);
+                    if(slaEvent != null) {
+                        insertList.add(slaEvent);
+                    }
                 }
             }
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,6 +20,7 @@ package org.apache.oozie.command.wf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -37,7 +38,8 @@ import org.apache.oozie.util.ParamChecke
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.ELService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.store.StoreException;
@@ -53,9 +55,11 @@ import org.apache.oozie.service.SchemaSe
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
 import org.jdom.Element;
 import org.jdom.Namespace;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -69,6 +73,7 @@ public class SubmitXCommand extends Work
 
     private Configuration conf;
     private String authToken;
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public SubmitXCommand(Configuration conf, String authToken) {
         super("submit", "submit", 1);
@@ -178,9 +183,15 @@ public class SubmitXCommand extends Work
             // System.out.println("SlaXml :"+ slaXml);
 
             //store.insertWorkflow(workflow);
+            insertList.add(workflow);
             JPAService jpaService = Services.get().get(JPAService.class);
             if (jpaService != null) {
-                jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
+                try {
+                    jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
+                }
+                catch (JPAExecutorException je) {
+                    throw new CommandException(je);
+                }
             }
             else {
                 LOG.error(ErrorCode.E0610);
@@ -223,7 +234,11 @@ public class SubmitXCommand extends Work
         try {
             if (slaXml != null && slaXml.length() > 0) {
                 Element eSla = XmlUtils.parseXml(slaXml);
-                SLADbOperations.writeSlaRegistrationEvent(eSla, id, SlaAppType.WORKFLOW_JOB, user, group, log);
+                SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, id,
+                        SlaAppType.WORKFLOW_JOB, user, group, log);
+                if(slaEvent != null) {
+                    insertList.add(slaEvent);
+                }
             }
         }
         catch (Exception e) {

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,20 +17,22 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -44,6 +46,7 @@ public class SuspendXCommand extends Wor
     private final String wfid;
     private WorkflowJobBean wfJobBean;
     private JPAService jpaService;
+    private static List<JsonBean> updateList = new ArrayList<JsonBean>();
 
     public SuspendXCommand(String id) {
         super("suspend", "suspend", 1);
@@ -58,7 +61,9 @@ public class SuspendXCommand extends Wor
         InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
         try {
             suspendJob(this.jpaService, this.wfJobBean, this.wfid, null);
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(this.wfJobBean));
+            this.wfJobBean.setLastModifiedTime(new Date());
+            updateList.add(this.wfJobBean);
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
             queue(new NotificationXCommand(this.wfJobBean));
         }
         catch (WorkflowException e) {
@@ -121,7 +126,7 @@ public class SuspendXCommand extends Wor
                 else {
                     action.resetPendingOnly();
                 }
-                jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+                updateList.add(action);
 
             }
         }

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java?rev=1376204&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java Wed Aug 22 19:32:02 2012
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Delete job, its list of actions and return the number of
+ * actions been deleted.
+ */
+public class BulkDeleteForPurgeJPAExecutor implements JPAExecutor<Integer> {
+
+    private Collection<JsonBean> deleteList;
+
+    /**
+     * Initialize the JPAExecutor using the delete list of JSON beans
+     * @param deleteList
+     */
+    public BulkDeleteForPurgeJPAExecutor(Collection<JsonBean> deleteList) {
+        this.deleteList = deleteList;
+    }
+
+    public BulkDeleteForPurgeJPAExecutor() {
+    }
+
+    /**
+     * Sets the delete list for JSON bean
+     *
+     * @param deleteList
+     */
+    public void setDeleteList(Collection<JsonBean> deleteList) {
+        this.deleteList = deleteList;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BulkDeleteForPurgeJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Integer execute(EntityManager em) throws JPAExecutorException {
+        int actionsDeleted = 0;
+        try {
+            // Only used by test cases to check for rollback of transaction
+            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+            if (deleteList != null) {
+                for (JsonBean entity : deleteList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    // deleting the job (wf/coord/bundle)
+                    em.remove(em.merge(entity));
+                    if (entity instanceof WorkflowJobBean) {
+                        // deleting the workflow actions for this job
+                        Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
+                        g.setParameter("wfId", ((WorkflowJobBean) entity).getId());
+                        actionsDeleted = g.executeUpdate();
+                    }
+                    else if (entity instanceof CoordinatorJobBean) {
+                        // deleting the coord actions for this job
+                        Query g = em.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
+                        g.setParameter("jobId", ((CoordinatorJobBean) entity).getId());
+                        actionsDeleted = g.executeUpdate();
+                    }
+                    else if (entity instanceof BundleJobBean) {
+                        // deleting the bundle actions for this job
+                        Query g = em.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_BUNDLE");
+                        g.setParameter("bundleId", ((BundleJobBean) entity).getId());
+                        actionsDeleted = g.executeUpdate();
+                    }
+                }
+            }
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e);
+        }
+        return actionsDeleted;
+    }
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java?rev=1376204&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java Wed Aug 22 19:32:02 2012
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Class for updating and deleting beans in bulk
+ */
+public class BulkUpdateDeleteJPAExecutor implements JPAExecutor<Void> {
+
+    private Collection<JsonBean> updateList;
+    private Collection<JsonBean> deleteList;
+    private boolean forRerun = true;
+
+    /**
+     * Initialize the JPAExecutor using the update and delete list of JSON beans
+     * @param deleteList
+     * @param updateList
+     */
+    public BulkUpdateDeleteJPAExecutor(Collection<JsonBean> updateList, Collection<JsonBean> deleteList,
+            boolean forRerun) {
+        this.updateList = updateList;
+        this.deleteList = deleteList;
+        this.forRerun = forRerun;
+    }
+
+    public BulkUpdateDeleteJPAExecutor() {
+    }
+
+    /**
+     * Sets the update list for JSON bean
+     *
+     * @param updateList
+     */
+    public void setUpdateList(Collection<JsonBean> updateList) {
+        this.updateList = updateList;
+    }
+
+    /**
+     * Sets the delete list for JSON bean
+     *
+     * @param deleteList
+     */
+    public void setDeleteList(Collection<JsonBean> deleteList) {
+        this.deleteList = deleteList;
+    }
+
+    /**
+     * Sets whether for RerunX command or no. Else it'd be for ChangeX
+     *
+     * @param forRerun
+     */
+    public void setForRerun(boolean forRerun) {
+        this.forRerun = forRerun;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BulkUpdateDeleteJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            if (updateList != null) {
+                for (JsonBean entity : updateList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    em.merge(entity);
+                }
+            }
+            // Only used by test cases to check for rollback of transaction
+            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+            if (deleteList != null) {
+                for (JsonBean entity : deleteList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    if (forRerun) {
+                        em.remove(em.merge(entity));
+                    }
+                    else {
+                        Query g = em.createNamedQuery("DELETE_UNSCHEDULED_ACTION");
+                        String coordActionId = ((CoordinatorActionBean) entity).getId();
+                        g.setParameter("id", coordActionId);
+                        int actionsDeleted = g.executeUpdate();
+                        if (actionsDeleted == 0)
+                            throw new JPAExecutorException(ErrorCode.E1022, coordActionId);
+                    }
+                }
+            }
+            return null;
+        }
+        catch (JPAExecutorException je) {
+            throw je;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e);
+        }
+    }
+}