You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/22 21:32:05 UTC

svn commit: r1376204 [1/4] - in /incubator/oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src...

Author: virag
Date: Wed Aug 22 19:32:02 2012
New Revision: 1376204

URL: http://svn.apache.org/viewvc?rev=1376204&view=rev
Log:
OOZIE-914 Make sure all commands do their JPA writes within a single JPA executor (mona via virag)

Added:
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStartJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStatusJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java
Modified:
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/PurgeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionsDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobDeleteJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdateForStartJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionByActionNumberJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/store/SLAStore.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/db/SLADbOperations.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/db/SLADbXOperations.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleActionsDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobDeleteJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForStartJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionsDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionsDeleteForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
    incubator/oozie/trunk/release-log.txt

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleActionBean.java Wed Aug 22 19:32:02 2012
@@ -35,9 +35,10 @@ import javax.persistence.Table;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.WritableUtils;
-import org.apache.openjpa.persistence.jdbc.Index;
+import org.json.simple.JSONObject;
 
 @Entity
 @Table(name = "BUNDLE_ACTIONS")
@@ -72,7 +73,7 @@ import org.apache.openjpa.persistence.jd
         @NamedQuery(name = "GET_BUNDLE_ACTIONS_OLDER_THAN", query = "select OBJECT(w) from BundleActionBean w order by w.lastModifiedTimestamp"),
 
         @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED' OR a.status = 'DONEWITHERROR')")})
-public class BundleActionBean implements Writable {
+public class BundleActionBean implements Writable, JsonBean {
 
     @Id
     @Column(name = "bundle_action_id")
@@ -364,4 +365,14 @@ public class BundleActionBean implements
             setLastModifiedTime(new Date(d));
         }
     }
+
+    @Override
+    public JSONObject toJSONObject() {
+        return null;
+    }
+
+    @Override
+    public JSONObject toJSONObject(String timeZoneId) {
+        return null;
+    }
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/KillTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -51,6 +51,7 @@ public abstract class KillTransitionXCom
             transitToNext();
             killChildren();
             updateJob();
+            performWrites();
         }
         finally {
             notifyParent();

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -71,6 +71,7 @@ public abstract class MaterializeTransit
         try {
             materialize();
             updateJob();
+            performWrites();
         } finally {
             notifyParent();
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/RerunTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -18,7 +18,6 @@
 package org.apache.oozie.command;
 
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.util.StatusUtils;
 
@@ -96,6 +95,7 @@ public abstract class RerunTransitionXCo
             transitToNext();
             rerunChildren();
             updateJob();
+            performWrites();
         }
         finally {
             notifyParent();

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/ResumeTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -67,6 +67,7 @@ public abstract class ResumeTransitionXC
         try {
             resumeChildren();
             updateJob();
+            performWrites();
         } finally {
             notifyParent();
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/StartTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -79,6 +79,7 @@ public abstract class StartTransitionXCo
         transitToNext();
         updateJob();
         StartChildren();
+        performWrites();
         notifyParent();
         return null;
     }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/SuspendTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -73,6 +73,7 @@ public abstract class SuspendTransitionX
         try {
             suspendChildren();
             updateJob();
+            performWrites();
         } finally {
             notifyParent();
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,7 +17,11 @@
  */
 package org.apache.oozie.command;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.oozie.client.Job;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.util.ParamChecker;
 
 /**
@@ -29,6 +33,8 @@ import org.apache.oozie.util.ParamChecke
 public abstract class TransitionXCommand<T> extends XCommand<T> {
 
     protected Job job;
+    protected List<JsonBean> updateList = new ArrayList<JsonBean>();
+    protected List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public TransitionXCommand(String name, String type, int priority) {
         super(name, type, priority);
@@ -59,6 +65,13 @@ public abstract class TransitionXCommand
      */
     public abstract void notifyParent() throws CommandException;
 
+    /**
+     * This will be used to perform atomically all the writes within this command.
+     *
+     * @throws CommandException
+     */
+    public abstract void performWrites() throws CommandException;
+
     /* (non-Javadoc)
      * @see org.apache.oozie.command.XCommand#execute()
      */

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.command.bundle;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -29,14 +30,14 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.XCommand;
 import org.apache.oozie.command.coord.CoordChangeXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
@@ -55,6 +56,7 @@ public class BundleJobChangeXCommand ext
     private Date newEndTime = null;
     boolean isChangePauseTime = false;
     boolean isChangeEndTime = false;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
 
     private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
     static {
@@ -179,10 +181,11 @@ public class BundleJobChangeXCommand ext
                         LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change "
                                 + changeValue);
                         action.setPending(action.getPending() + 1);
-                        jpaService.execute(new BundleActionUpdateJPAExecutor(action));
+                        updateList.add(action);
                     }
                 }
-                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+                updateList.add(bundleJob);
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
             }
             return null;
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java Wed Aug 22 19:32:02 2012
@@ -23,16 +23,14 @@ import java.util.List;
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.XException;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.ResumeTransitionXCommand;
 import org.apache.oozie.command.coord.CoordResumeXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -61,33 +59,28 @@ public class BundleJobResumeXCommand ext
      * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
      */
     @Override
-    public void resumeChildren() throws CommandException {
-        try {
-            for (BundleActionBean action : bundleActions) {
-                if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) {
-                    // queue a CoordResumeXCommand
-                    if (action.getCoordId() != null) {
-                        queue(new CoordResumeXCommand(action.getCoordId()));
-                        updateBundleAction(action);
-                        LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]",
-                                        action.getBundleActionId(), action.getStatus(), action.getPending(), action
-                                                .getCoordId());
-                    }
-                    else {
-                        updateBundleAction(action);
-                        LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
-                                        action.getBundleActionId(), action.getStatus(), action.getPending());
-                    }
+    public void resumeChildren() {
+        for (BundleActionBean action : bundleActions) {
+            if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) {
+                // queue a CoordResumeXCommand
+                if (action.getCoordId() != null) {
+                    queue(new CoordResumeXCommand(action.getCoordId()));
+                    updateBundleAction(action);
+                    LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]",
+                                    action.getBundleActionId(), action.getStatus(), action.getPending(), action
+                                            .getCoordId());
+                }
+                else {
+                    updateBundleAction(action);
+                    LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
+                                    action.getBundleActionId(), action.getStatus(), action.getPending());
                 }
             }
-            LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId);
-        }
-        catch (XException ex) {
-            throw new CommandException(ex);
         }
+        LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId);
     }
 
-    private void updateBundleAction(BundleActionBean action) throws CommandException {
+    private void updateBundleAction(BundleActionBean action) {
         if (action.getStatus() == Job.Status.PREPSUSPENDED) {
             action.setStatus(Job.Status.PREP);
         }
@@ -99,12 +92,7 @@ public class BundleJobResumeXCommand ext
         }
         action.incrementAndGetPending();
         action.setLastModifiedTime(new Date());
-        try {
-            jpaService.execute(new BundleActionUpdateJPAExecutor(action));
-        }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
-        }
+        updateList.add(action);
     }
 
     /* (non-Javadoc)
@@ -119,13 +107,21 @@ public class BundleJobResumeXCommand ext
      * @see org.apache.oozie.command.TransitionXCommand#updateJob()
      */
     @Override
-    public void updateJob() throws CommandException {
+    public void updateJob() {
         InstrumentUtils.incrJobCounter("bundle_resume", 1, null);
         bundleJob.setSuspendedTime(null);
         bundleJob.setLastModifiedTime(new Date());
         LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
+        updateList.add(bundleJob);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
         try {
-            jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobSuspendXCommand.java Wed Aug 22 19:32:02 2012
@@ -23,16 +23,14 @@ import java.util.List;
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.XException;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.SuspendTransitionXCommand;
 import org.apache.oozie.command.coord.CoordSuspendXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -74,6 +72,19 @@ public class BundleJobSuspendXCommand ex
     }
 
     /* (non-Javadoc)
+     * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
+        try {
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+    }
+
+    /* (non-Javadoc)
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
@@ -133,49 +144,39 @@ public class BundleJobSuspendXCommand ex
      * @see org.apache.oozie.command.TransitionXCommand#updateJob()
      */
     @Override
-    public void updateJob() throws CommandException {
+    public void updateJob() {
         InstrumentUtils.incrJobCounter("bundle_suspend", 1, null);
         bundleJob.setSuspendedTime(new Date());
         bundleJob.setLastModifiedTime(new Date());
 
         LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
-        try {
-            jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
-        }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
-        }
+        updateList.add(bundleJob);
     }
 
     @Override
     public void suspendChildren() throws CommandException {
-        try {
-            for (BundleActionBean action : this.bundleActions) {
-                if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR
-                        || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED
-                        || action.getStatus() == Job.Status.PAUSEDWITHERROR) {
-                    // queue a CoordSuspendXCommand
-                    if (action.getCoordId() != null) {
-                        queue(new CoordSuspendXCommand(action.getCoordId()));
-                        updateBundleAction(action);
-                        LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
-                                action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
-                    } else {
-                        updateBundleAction(action);
-                        LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
-                                action.getBundleActionId(), action.getStatus(), action.getPending());
-                    }
-
+        for (BundleActionBean action : this.bundleActions) {
+            if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR
+                    || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED
+                    || action.getStatus() == Job.Status.PAUSEDWITHERROR) {
+                // queue a CoordSuspendXCommand
+                if (action.getCoordId() != null) {
+                    queue(new CoordSuspendXCommand(action.getCoordId()));
+                    updateBundleAction(action);
+                    LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
+                            action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
+                } else {
+                    updateBundleAction(action);
+                    LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
+                            action.getBundleActionId(), action.getStatus(), action.getPending());
                 }
+
             }
-            LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
-        }
-        catch (XException ex) {
-            throw new CommandException(ex);
         }
+        LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
     }
 
-    private void updateBundleAction(BundleActionBean action) throws CommandException {
+    private void updateBundleAction(BundleActionBean action) {
         if (action.getStatus() == Job.Status.PREP) {
             action.setStatus(Job.Status.PREPSUSPENDED);
         }
@@ -194,11 +195,6 @@ public class BundleJobSuspendXCommand ex
 
         action.incrementAndGetPending();
         action.setLastModifiedTime(new Date());
-        try {
-            jpaService.execute(new BundleActionUpdateJPAExecutor(action));
-        }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
-        }
+        updateList.add(action);
     }
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java Wed Aug 22 19:32:02 2012
@@ -30,10 +30,9 @@ import org.apache.oozie.command.CommandE
 import org.apache.oozie.command.KillTransitionXCommand;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.coord.CoordKillXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -134,16 +133,11 @@ public class BundleKillXCommand extends 
      * @param action
      * @throws CommandException
      */
-    private void updateBundleAction(BundleActionBean action) throws CommandException {
+    private void updateBundleAction(BundleActionBean action) {
         action.incrementAndGetPending();
         action.setLastModifiedTime(new Date());
         action.setStatus(Job.Status.KILLED);
-        try {
-            jpaService.execute(new BundleActionUpdateJPAExecutor(action));
-        }
-        catch (JPAExecutorException e) {
-            throw new CommandException(e);
-        }
+        updateList.add(action);
     }
 
     /* (non-Javadoc)
@@ -165,9 +159,17 @@ public class BundleKillXCommand extends 
      * @see org.apache.oozie.command.TransitionXCommand#updateJob()
      */
     @Override
-    public void updateJob() throws CommandException {
+    public void updateJob() {
+        updateList.add(bundleJob);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
         try {
-            jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -102,4 +102,8 @@ public class BundlePauseXCommand extends
 
     }
 
+    @Override
+    public void performWrites() throws CommandException {
+    }
+
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePurgeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,21 +17,20 @@
  */
 package org.apache.oozie.command.bundle;
 
+import java.util.Collection;
 import java.util.List;
 
-import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.XCommand;
-import org.apache.oozie.executor.jpa.BundleActionsDeleteForPurgeJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobDeleteJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkDeleteForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.util.XLog;
 
 /**
  * This class is used for bundle purge command
@@ -40,7 +39,7 @@ public class BundlePurgeXCommand extends
     private JPAService jpaService = null;
     private final int olderThan;
     private final int limit;
-    private List<BundleJobBean> jobList = null;
+    private List<? extends JsonBean> jobList = null;
 
     public BundlePurgeXCommand(int olderThan, int limit) {
         super("bundle_purge", "bundle_purge", 0);
@@ -77,15 +76,11 @@ public class BundlePurgeXCommand extends
 
         int actionDeleted = 0;
         if (jobList != null && jobList.size() != 0) {
-            for (BundleJobBean bundle : jobList) {
-                String jobId = bundle.getId();
-                try {
-                    jpaService.execute(new BundleJobDeleteJPAExecutor(jobId));
-                    actionDeleted += jpaService.execute(new BundleActionsDeleteForPurgeJPAExecutor(jobId));
-                }
-                catch (JPAExecutorException e) {
-                    throw new CommandException(e);
-                }
+            try {
+                actionDeleted = jpaService.execute(new BulkDeleteForPurgeJPAExecutor((Collection<JsonBean>) jobList));
+            }
+            catch (JPAExecutorException je) {
+                throw new CommandException(je);
             }
             LOG.debug("ENDED Bundle-Purge deleted jobs :" + jobList.size() + " and actions " + actionDeleted);
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java Wed Aug 22 19:32:02 2012
@@ -32,10 +32,9 @@ import org.apache.oozie.client.rest.Rest
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.RerunTransitionXCommand;
 import org.apache.oozie.command.coord.CoordRerunXCommand;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
@@ -195,42 +194,44 @@ public class BundleRerunXCommand extends
      * @param action the bundle action
      * @throws CommandException thrown if failed to update bundle action
      */
-    private void updateBundleAction(BundleActionBean action) throws CommandException {
+    private void updateBundleAction(BundleActionBean action) {
         action.incrementAndGetPending();
         action.setLastModifiedTime(new Date());
-        try {
-            jpaService.execute(new BundleActionUpdateJPAExecutor(action));
-        }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
-        }
+        updateList.add(action);
     }
 
     /* (non-Javadoc)
      * @see org.apache.oozie.command.TransitionXCommand#updateJob()
      */
     @Override
-    public void updateJob() throws CommandException {
-        try {
-            // rerun a paused bundle job will keep job status at paused and pending at previous pending
-            if (getPrevStatus() != null) {
-                Job.Status bundleJobStatus = getPrevStatus();
-                if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
-                    bundleJob.setStatus(bundleJobStatus);
-                    if (prevPending) {
-                        bundleJob.setPending();
-                    }
-                    else {
-                        bundleJob.resetPending();
-                    }
+    public void updateJob() {
+        // rerun a paused bundle job will keep job status at paused and pending at previous pending
+        if (getPrevStatus() != null) {
+            Job.Status bundleJobStatus = getPrevStatus();
+            if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
+                bundleJob.setStatus(bundleJobStatus);
+                if (prevPending) {
+                    bundleJob.setPending();
+                }
+                else {
+                    bundleJob.resetPending();
                 }
             }
-            jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
-        }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
         }
+        updateList.add(bundleJob);
+    }
 
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
+        try {
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
     }
 
     /* (non-Javadoc)

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java Wed Aug 22 19:32:02 2012
@@ -32,13 +32,12 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.StartTransitionXCommand;
 import org.apache.oozie.command.coord.CoordSubmitXCommand;
-import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -166,6 +165,19 @@ public class BundleStartXCommand extends
     public void notifyParent() {
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.StartTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
+        try {
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+    }
+
     /**
      * Insert bundle actions
      *
@@ -200,26 +212,25 @@ public class BundleStartXCommand extends
                 throw new CommandException(ErrorCode.E1301, jex);
             }
 
-            try {
-                // if there is no coordinator for this bundle, failed it.
-                if (map.isEmpty()) {
-                    bundleJob.setStatus(Job.Status.FAILED);
-                    bundleJob.resetPending();
+            // if there is no coordinator for this bundle, failed it.
+            if (map.isEmpty()) {
+                bundleJob.setStatus(Job.Status.FAILED);
+                bundleJob.resetPending();
+                try {
                     jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
-                    LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
-                    throw new CommandException(ErrorCode.E1318, jobId);
                 }
-
-                for (Entry<String, Boolean> coordName : map.entrySet()) {
-                    BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
-
-                    jpaService.execute(new BundleActionInsertJPAExecutor(action));
+                catch (JPAExecutorException jex) {
+                    throw new CommandException(jex);
                 }
-            }
-            catch (JPAExecutorException je) {
-                throw new CommandException(je);
+
+                LOG.debug("No coord jobs for the bundle=[{0}], failed it!!", jobId);
+                throw new CommandException(ErrorCode.E1318, jobId);
             }
 
+            for (Entry<String, Boolean> coordName : map.entrySet()) {
+                BundleActionBean action = createBundleAction(jobId, coordName.getKey(), coordName.getValue());
+                insertList.add(action);
+            }
         }
         else {
             throw new CommandException(ErrorCode.E0604, jobId);
@@ -260,8 +271,8 @@ public class BundleStartXCommand extends
 
                     queue(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
 
-                    updateBundleAction(name.getValue());
                 }
+                updateBundleAction();
             }
             catch (JDOMException jex) {
                 throw new CommandException(ErrorCode.E1301, jex);
@@ -275,11 +286,12 @@ public class BundleStartXCommand extends
         }
     }
 
-    private void updateBundleAction(String coordName) throws JPAExecutorException {
-        BundleActionBean action = jpaService.execute(new BundleActionGetJPAExecutor(jobId, coordName));
-        action.incrementAndGetPending();
-        action.setLastModifiedTime(new Date());
-        jpaService.execute(new BundleActionUpdateJPAExecutor(action));
+    private void updateBundleAction() throws JPAExecutorException {
+        for(JsonBean bAction : insertList) {
+            BundleActionBean action = (BundleActionBean) bAction;
+            action.incrementAndGetPending();
+            action.setLastModifiedTime(new Date());
+        }
     }
 
     /**
@@ -348,11 +360,6 @@ public class BundleStartXCommand extends
      */
     @Override
     public void updateJob() throws CommandException {
-        try {
-            jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
-        }
-        catch (JPAExecutorException je) {
-            throw new CommandException(je);
-        }
+        updateList.add(bundleJob);
     }
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java Wed Aug 22 19:32:02 2012
@@ -20,7 +20,6 @@ package org.apache.oozie.command.bundle;
 import java.util.Date;
 
 import org.apache.oozie.BundleActionBean;
-import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
@@ -31,8 +30,6 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.command.StatusUpdateXCommand;
 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleSubmitXCommand.java Wed Aug 22 19:32:02 2012
@@ -521,4 +521,8 @@ public class BundleSubmitXCommand extend
     @Override
     public void updateJob() throws CommandException {
     }
+
+    @Override
+    public void performWrites() throws CommandException {
+    }
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -116,4 +116,8 @@ public class BundleUnpauseXCommand exten
 
     }
 
+    @Override
+    public void performWrites() throws CommandException {
+    }
+
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Wed Aug 22 19:32:02 2012
@@ -18,10 +18,13 @@
 package org.apache.oozie.command.coord;
 
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.service.JPAService;
@@ -34,8 +37,10 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 
@@ -47,6 +52,8 @@ public class CoordActionCheckXCommand ex
     private int actionCheckDelay;
     private CoordinatorActionBean coordAction = null;
     private JPAService jpaService = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public CoordActionCheckXCommand(String actionId, int actionCheckDelay) {
         super("coord_action_check", "coord_action_check", 0);
@@ -88,7 +95,8 @@ public class CoordActionCheckXCommand ex
                     else {
                         LOG.warn("Unexpected workflow " + wf.getId() + " STATUS " + wf.getStatus());
                         coordAction.setLastModifiedTime(new Date());
-                        jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction));
+                        updateList.add(coordAction);
+                        jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
                         return null;
                     }
                 }
@@ -97,12 +105,17 @@ public class CoordActionCheckXCommand ex
             LOG.debug("Updating Coordintaor actionId :" + coordAction.getId() + "status to ="
                             + coordAction.getStatus());
             coordAction.setLastModifiedTime(new Date());
-            jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction));
+            updateList.add(coordAction);
 
             if (slaStatus != null) {
-                SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
+                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
                         SlaAppType.COORDINATOR_ACTION, LOG);
+                if(slaEvent != null) {
+                    insertList.add(slaEvent);
+                }
             }
+
+            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
         }
         catch (XException ex) {
             LOG.warn("CoordActionCheckCommand Failed ", ex);

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java Wed Aug 22 19:32:02 2012
@@ -19,18 +19,26 @@ package org.apache.oozie.command.coord;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.List;
 import java.util.TimeZone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.store.CoordinatorStore;
@@ -52,22 +60,31 @@ public class CoordActionMaterializeComma
     private final XLog log = XLog.getLog(getClass());
     private String user;
     private String group;
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+
     /**
      * Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout
      */
     public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout";
 
     public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) {
-        super("coord_action_mater", "coord_action_mater", 1, XLog.STD);
+        super("coord_action_mater", "coord_action_mater", 1, XLog.STD, false);
         this.jobId = jobId;
         this.startTime = startTime;
         this.endTime = endTime;
     }
 
     @Override
-    protected Void call(CoordinatorStore store) throws StoreException, CommandException {
-        // CoordinatorJobBean job = store.getCoordinatorJob(jobId, true);
-        CoordinatorJobBean job = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
+    protected Void call(CoordinatorStore store) throws CommandException {
+        CoordJobGetJPAExecutor getCoordJob = new CoordJobGetJPAExecutor(jobId);
+        CoordinatorJobBean job;
+        try {
+            job = Services.get().get(JPAService.class).execute(getCoordJob);
+        }
+        catch (JPAExecutorException jex) {
+            throw new CommandException(jex);
+        }
         setLogInfo(job);
         if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) {
             log.info("ENDED Coordinator materialization for jobId = " + jobId
@@ -88,7 +105,7 @@ public class CoordActionMaterializeComma
             if (job.getStatus() == CoordinatorJob.Status.PREMATER) {
                 job.setStatus(CoordinatorJob.Status.RUNNING);
             }
-            store.updateCoordinatorJob(job);
+            updateList.add(job);
             return null;
         }
 
@@ -115,7 +132,7 @@ public class CoordActionMaterializeComma
             catch (CommandException ex) {
                 log.warn("Exception occurs:" + ex + " Making the job failed ");
                 job.setStatus(CoordinatorJobBean.Status.FAILED);
-                store.updateCoordinatorJob(job);
+                updateList.add(job);
             }
             catch (Exception e) {
                 log.error("Excepion thrown :", e);
@@ -234,8 +251,8 @@ public class CoordActionMaterializeComma
     private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store) throws Exception {
         log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length());
         actionBean.setActionXml(actionXml);
-        store.insertCoordinatorAction(actionBean);
-        writeActionRegistration(actionXml, actionBean, store);
+        insertList.add(actionBean);
+        createActionRegistration(actionXml, actionBean, store);
 
         // TODO: time 100s should be configurable
         queueCallable(new CoordActionNotificationXCommand(actionBean), 100);
@@ -248,12 +265,15 @@ public class CoordActionMaterializeComma
      * @param store
      * @throws Exception
      */
-    private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store)
+    private void createActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store)
             throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml);
         Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
-        SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user,
-                                                  group);
+        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, store, actionBean.getId(),
+                SlaAppType.COORDINATOR_ACTION, user, group);
+        if(slaEvent != null) {
+            insertList.add(slaEvent);
+        }
     }
 
     /**
@@ -261,7 +281,7 @@ public class CoordActionMaterializeComma
      * @param store
      * @throws StoreException
      */
-    private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) throws StoreException {
+    private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) {
         // TODO: why do we need this? Isn't lastMatTime enough???
         job.setLastActionTime(endTime);
         job.setLastActionNumber(lastActionNumber);
@@ -278,7 +298,7 @@ public class CoordActionMaterializeComma
             log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING");
         }
         job.setNextMaterializedTime(endTime);
-        store.updateCoordinatorJob(job);
+        updateList.add(job);
     }
 
     @Override
@@ -288,6 +308,18 @@ public class CoordActionMaterializeComma
         try {
             if (lock(jobId)) {
                 call(store);
+                JPAService jpaService = Services.get().get(JPAService.class);
+                if (jpaService != null) {
+                    try {
+                        jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+                    }
+                    catch (JPAExecutorException je) {
+                        throw new CommandException(je);
+                    }
+                }
+                else {
+                    throw new CommandException(ErrorCode.E0610);
+                }
             }
             else {
                 queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime),

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Wed Aug 22 19:32:02 2012
@@ -25,6 +25,7 @@ import org.apache.oozie.CoordinatorActio
 import org.apache.oozie.DagEngineException;
 import org.apache.oozie.DagEngine;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
@@ -40,15 +41,18 @@ import org.apache.oozie.util.XConfigurat
 import org.apache.oozie.util.db.SLADbOperations;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
-
 import org.jdom.Element;
 import org.jdom.JDOMException;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 
 public class CoordActionStartXCommand extends CoordinatorXCommand<Void> {
 
@@ -65,6 +69,8 @@ public class CoordActionStartXCommand ex
     private CoordinatorActionBean coordAction = null;
     private JPAService jpaService = null;
     private String jobId = null;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public CoordActionStartXCommand(String id, String user, String token, String jobId) {
         //super("coord_action_start", "coord_action_start", 1, XLog.OPS);
@@ -160,8 +166,11 @@ public class CoordActionStartXCommand ex
             try {
                 boolean startJob = true;
                 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
-                SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
-                                                SlaAppType.COORDINATOR_ACTION, log);
+                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
+                        SlaAppType.COORDINATOR_ACTION, log);
+                if(slaEvent != null) {
+                    insertList.add(slaEvent);
+                }
 
                 // Normalize workflow appPath here;
                 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
@@ -176,8 +185,15 @@ public class CoordActionStartXCommand ex
                     log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId);
                     WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
                     wfJob.setParentId(actionId);
-                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
-                    jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForStartJPAExecutor(coordAction));
+                    wfJob.setLastModifiedTime(new Date());
+                    updateList.add(wfJob);
+                    updateList.add(coordAction);
+                    try {
+                        jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
+                    }
+                    catch (JPAExecutorException je) {
+                        throw new CommandException(je);
+                    }
                 }
                 else {
                     log.error(ErrorCode.E0610);
@@ -215,20 +231,22 @@ public class CoordActionStartXCommand ex
                     coordAction.setErrorMessage(errMsg);
                     coordAction.setErrorCode(errCode);
 
-                    JPAService jpaService = Services.get().get(JPAService.class);
-                    if (jpaService != null) {
-                        try {
-                            jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForStartJPAExecutor(coordAction));
-                        }
-                        catch (JPAExecutorException je) {
-                            throw new CommandException(je);
-                        }
+                    updateList = new ArrayList<JsonBean>();
+                    updateList.add(coordAction);
+                    insertList = new ArrayList<JsonBean>();
+
+                    SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
+                            SlaAppType.COORDINATOR_ACTION, log);
+                    if(slaEvent != null) {
+                        insertList.add(slaEvent); //Update SLA events
+                    }
+                    try {
+                        // call JPAExecutor to do the bulk writes
+                        jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
                     }
-                    else {
-                        log.error(ErrorCode.E0610);
+                    catch (JPAExecutorException je) {
+                        throw new CommandException(je);
                     }
-                    SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
-                            SlaAppType.COORDINATOR_ACTION, log); //Update SLA events
                     queue(new CoordActionReadyXCommand(coordAction.getJobId()));
                 }
             }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,11 +17,13 @@
  */
 package org.apache.oozie.command.coord;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.service.JPAService;
@@ -32,13 +34,11 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 
 public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
@@ -46,6 +46,8 @@ public class CoordActionUpdateXCommand e
     private CoordinatorActionBean coordAction = null;
     private JPAService jpaService = null;
     private int maxRetries = 1;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public CoordActionUpdateXCommand(WorkflowJobBean workflow) {
         super("coord-action-update", "coord-action-update", 1);
@@ -93,7 +95,8 @@ public class CoordActionUpdateXCommand e
                 LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
                 // update lastModifiedTime
                 coordAction.setLastModifiedTime(new Date());
-                jpaService.execute(new CoordActionUpdateStatusJPAExecutor(coordAction));
+                updateList.add(coordAction);
+                jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
                 // TODO - Uncomment this when bottom up rerun can change terminal state
                 /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
                 if (!coordJob.isPending()) {
@@ -107,7 +110,7 @@ public class CoordActionUpdateXCommand e
                     + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
 
             coordAction.setLastModifiedTime(new Date());
-            jpaService.execute(new CoordActionUpdateStatusJPAExecutor(coordAction));
+            updateList.add(coordAction);
             // TODO - Uncomment this when bottom up rerun can change terminal state
             /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
             if (!coordJob.isPending()) {
@@ -116,13 +119,19 @@ public class CoordActionUpdateXCommand e
                 LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
             }*/
             if (slaStatus != null) {
-                SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
+                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
                         SlaAppType.COORDINATOR_ACTION, LOG);
+                if(slaEvent != null) {
+                    insertList.add(slaEvent);
+                }
             }
             if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED
                     && workflow.getStatus() != WorkflowJob.Status.RUNNING) {
                 queue(new CoordActionReadyXCommand(coordAction.getJobId()));
             }
+
+            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
+
             LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
         }
         catch (XException ex) {

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,27 +17,31 @@
  */
 package org.apache.oozie.command.coord;
 
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -57,6 +61,8 @@ public class CoordChangeXCommand extends
     private CoordinatorJobBean coordJob;
     private JPAService jpaService = null;
     private Job.Status prevStatus;
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private List<JsonBean> deleteList = new ArrayList<JsonBean>();
 
     private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
     static {
@@ -250,7 +256,8 @@ public class CoordChangeXCommand extends
     private void deleteAction(int actionNum) throws CommandException {
         try {
             String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
-            jpaService.execute(new CoordActionRemoveJPAExecutor(actionId));
+            CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+            deleteList.add(bean);
         } catch (JPAExecutorException e) {
             throw new CommandException(e);
         }
@@ -340,7 +347,8 @@ public class CoordChangeXCommand extends
                 coordJob.setDoneMaterialization();
             }
 
-            jpaService.execute(new CoordJobUpdateJPAExecutor(this.coordJob));
+            updateList.add(coordJob);
+            jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
 
             return null;
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Wed Aug 22 19:32:02 2012
@@ -28,10 +28,9 @@ import org.apache.oozie.command.wf.KillX
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.KillTransitionXCommand;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -102,7 +101,7 @@ public class CoordKillXCommand extends K
         }
     }
 
-    private void updateCoordAction(CoordinatorActionBean action, boolean makePending) throws CommandException {
+    private void updateCoordAction(CoordinatorActionBean action, boolean makePending) {
         action.setStatus(CoordinatorActionBean.Status.KILLED);
         if (makePending) {
             action.incrementAndGetPending();
@@ -111,43 +110,34 @@ public class CoordKillXCommand extends K
             action.setPending(0);
         }
         action.setLastModifiedTime(new Date());
-        try {
-            jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
-        } catch (JPAExecutorException e) {
-            throw new CommandException(e);
-        }
+        updateList.add(action);
     }
 
     @Override
     public void killChildren() throws CommandException {
-        try {
-            if (actionList != null) {
-                for (CoordinatorActionBean action : actionList) {
-                    // queue a WorkflowKillXCommand to delete the workflow job and actions
-                    if (action.getExternalId() != null) {
-                        queue(new KillXCommand(action.getExternalId()));
-                        // As the kill command for children is queued, set pending flag for coord action to be true
-                        updateCoordAction(action, true);
-                        LOG.debug(
-                                "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
-                                action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
-                    }
-                    else {
-                        // As killing children is not required, set pending flag for coord action to be false
-                        updateCoordAction(action, false);
-                        LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
-                                action.getId(), action.getStatus(), action.getPending());
-                    }
+        if (actionList != null) {
+            for (CoordinatorActionBean action : actionList) {
+                // queue a WorkflowKillXCommand to delete the workflow job and actions
+                if (action.getExternalId() != null) {
+                    queue(new KillXCommand(action.getExternalId()));
+                    // As the kill command for children is queued, set pending flag for coord action to be true
+                    updateCoordAction(action, true);
+                    LOG.debug(
+                            "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
+                            action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
+                }
+                else {
+                    // As killing children is not required, set pending flag for coord action to be false
+                    updateCoordAction(action, false);
+                    LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
+                            action.getId(), action.getStatus(), action.getPending());
                 }
             }
+        }
 
-            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        updateList.add(coordJob);
 
-            LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
-        }
-        catch (JPAExecutorException ex) {
-            throw new CommandException(ex);
-        }
+        LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
     }
 
     @Override
@@ -161,11 +151,19 @@ public class CoordKillXCommand extends K
 
     @Override
     public void updateJob() throws CommandException {
+        updateList.add(coordJob);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
         try {
-            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
         }
-        catch (JPAExecutorException ex) {
-            throw new CommandException(ex);
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
         }
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Wed Aug 22 19:32:02 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
@@ -36,10 +37,9 @@ import org.apache.oozie.command.Material
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
@@ -96,8 +96,16 @@ public class CoordMaterializeTransitionX
      */
     @Override
     public void updateJob() throws CommandException {
+        updateList.add(coordJob);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
+     */
+    @Override
+    public void performWrites() throws CommandException {
         try {
-            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
         }
         catch (JPAExecutorException jex) {
             throw new CommandException(jex);
@@ -345,7 +353,7 @@ public class CoordMaterializeTransitionX
                 + actionXml.length());
         actionBean.setActionXml(actionXml);
 
-        jpaService.execute(new CoordActionInsertJPAExecutor(actionBean));
+        insertList.add(actionBean);
         writeActionRegistration(actionXml, actionBean);
 
         // TODO: time 100s should be configurable
@@ -356,8 +364,11 @@ public class CoordMaterializeTransitionX
     private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml);
         Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
-        SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
+        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
                 .getUser(), coordJob.getGroup(), LOG);
+        if(slaEvent != null) {
+            insertList.add(slaEvent);
+        }
     }
 
     private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java Wed Aug 22 19:32:02 2012
@@ -111,4 +111,8 @@ public class CoordPauseXCommand extends 
 
     }
 
+    @Override
+    public void performWrites() throws CommandException {
+    }
+
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java?rev=1376204&r1=1376203&r2=1376204&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPurgeXCommand.java Wed Aug 22 19:32:02 2012
@@ -17,15 +17,15 @@
  */
 package org.apache.oozie.command.coord;
 
+import java.util.Collection;
 import java.util.List;
 
-import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.CoordActionsDeleteForPurgeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobDeleteJPAExecutor;
+import org.apache.oozie.executor.jpa.BulkDeleteForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
@@ -38,7 +38,7 @@ public class CoordPurgeXCommand extends 
     private JPAService jpaService = null;
     private final int olderThan;
     private final int limit;
-    private List<CoordinatorJobBean> jobList = null;
+    private List<? extends JsonBean> jobList = null;
 
     public CoordPurgeXCommand(int olderThan, int limit) {
         super("coord_purge", "coord_purge", 0);
@@ -55,15 +55,11 @@ public class CoordPurgeXCommand extends 
 
         int actionDeleted = 0;
         if (jobList != null && jobList.size() != 0) {
-            for (CoordinatorJobBean coord : jobList) {
-                String jobId = coord.getId();
-                try {
-                    jpaService.execute(new CoordJobDeleteJPAExecutor(jobId));
-                    actionDeleted += jpaService.execute(new CoordActionsDeleteForPurgeJPAExecutor(jobId));
-                }
-                catch (JPAExecutorException e) {
-                    throw new CommandException(e);
-                }
+            try {
+                actionDeleted = jpaService.execute(new BulkDeleteForPurgeJPAExecutor((Collection<JsonBean>) jobList));
+            }
+            catch (JPAExecutorException je) {
+                throw new CommandException(je);
             }
             LOG.debug("ENDED Coord-Purge deleted jobs :" + jobList.size() + " and actions " + actionDeleted);
         }