You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/09/08 04:29:33 UTC

svn commit: r1520828 [2/5] - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Sun Sep  8 02:29:31 2013
@@ -30,12 +30,9 @@ import org.apache.oozie.command.CommandE
 import org.apache.oozie.command.KillTransitionXCommand;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.dependency.DependencyChecker;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -127,7 +124,7 @@ public class CoordKillXCommand extends K
             CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null);
         }
         action.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,action));
+        updateList.add(action);
     }
 
     @Override
@@ -157,6 +154,8 @@ public class CoordKillXCommand extends K
             }
         }
         coordJob.setDoneMaterialization();
+        updateList.add(coordJob);
+
         LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
     }
 
@@ -171,13 +170,13 @@ public class CoordKillXCommand extends K
 
     @Override
     public void updateJob() throws CommandException {
-        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
+        updateList.add(coordJob);
     }
 
     @Override
     public void performWrites() throws CommandException {
         try {
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
+            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Sun Sep  8 02:29:31 2013
@@ -39,12 +39,10 @@ import org.apache.oozie.command.Material
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -104,7 +102,7 @@ public class CoordMaterializeTransitionX
      */
     @Override
     public void updateJob() throws CommandException {
-        updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob));
+        updateList.add(coordJob);
     }
 
     /* (non-Javadoc)
@@ -113,7 +111,7 @@ public class CoordMaterializeTransitionX
     @Override
     public void performWrites() throws CommandException {
         try {
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             // register the partition related dependencies of actions
             for (JsonBean actionBean : insertList) {
                 if (actionBean instanceof CoordinatorActionBean) {
@@ -279,7 +277,7 @@ public class CoordMaterializeTransitionX
             LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
             coordJob.setStatus(Job.Status.FAILED);
             try {
-                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, coordJob);
+                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
             }
             catch (JPAExecutorException jex) {
                 throw new CommandException(ErrorCode.E1011, jex);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java Sun Sep  8 02:29:31 2013
@@ -24,12 +24,14 @@ import org.apache.oozie.command.CommandE
 import org.apache.oozie.command.PauseTransitionXCommand;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
 import org.apache.oozie.util.LogUtils;
 
 public class CoordPauseXCommand extends PauseTransitionXCommand {
+    private final JPAService jpaService = Services.get().get(JPAService.class);
     private final CoordinatorJobBean coordJob;
     private CoordinatorJob.Status prevStatus = null;
 
@@ -96,7 +98,7 @@ public class CoordPauseXCommand extends 
     @Override
     public void updateJob() throws CommandException {
         try {
-            CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coordJob);
+            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Sun Sep  8 02:29:31 2013
@@ -37,8 +37,8 @@ import org.apache.oozie.dependency.Depen
 import org.apache.oozie.dependency.ActionDependency;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
@@ -252,17 +252,15 @@ public class CoordPushDependencyCheckXCo
         if (jpaService != null) {
             try {
                 if (isChangeInDependency) {
-                    CoordActionQueryExecutor.getInstance().executeUpdate(
-                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction);
-                    if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
-                        // since event is not to be generated unless action
-                        // RUNNING via StartX
+                    jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+                    if (EventHandlerService.isEnabled()
+                            && coordAction.getStatus() != CoordinatorAction.Status.READY) {
+                        //since event is not to be generated unless action RUNNING via StartX
                         generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
                     }
                 }
                 else {
-                    CoordActionQueryExecutor.getInstance().executeUpdate(
-                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
+                    jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
                 }
             }
             catch (JPAExecutorException jex) {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Sun Sep  8 02:29:31 2013
@@ -42,11 +42,8 @@ import org.apache.oozie.command.RerunTra
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.CoordUtils;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -204,7 +201,7 @@ public class CoordRerunXCommand extends 
         coordAction.setExternalStatus("");
         coordAction.setRerunTime(new Date());
         coordAction.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, coordAction));
+        updateList.add(coordAction);
         writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
     }
 
@@ -382,7 +379,7 @@ public class CoordRerunXCommand extends 
                 coordJob.resetPending();
             }
         }
-        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob));
+        updateList.add(coordJob);
     }
 
     /* (non-Javadoc)
@@ -391,7 +388,7 @@ public class CoordRerunXCommand extends 
     @Override
     public void performWrites() throws CommandException {
         try {
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             if (EventHandlerService.isEnabled()) {
                 generateEvents(coordJob);
             }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Sun Sep  8 02:29:31 2013
@@ -31,13 +31,10 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.command.ResumeTransitionXCommand;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.command.wf.ResumeXCommand;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -107,7 +104,7 @@ public class CoordResumeXCommand extends
         coordJob.setLastModifiedTime(new Date());
         LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
                 + coordJob.isPending());
-        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
+        updateList.add(coordJob);
     }
 
     @Override
@@ -145,8 +142,7 @@ public class CoordResumeXCommand extends
                 coordJob.resetPending();
                 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
                         + coordJob.getStatus());
-                updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME,
-                        coordJob));
+                updateList.add(coordJob);
             }
         }
     }
@@ -163,7 +159,7 @@ public class CoordResumeXCommand extends
     @Override
     public void performWrites() throws CommandException {
         try {
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
+            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);
@@ -174,7 +170,7 @@ public class CoordResumeXCommand extends
         action.setStatus(CoordinatorActionBean.Status.RUNNING);
         action.incrementAndGetPending();
         action.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action));
+        updateList.add(action);
     }
 
     @Override

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java Sun Sep  8 02:29:31 2013
@@ -53,7 +53,7 @@ import org.apache.oozie.coord.CoordELEva
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -1094,7 +1094,7 @@ public class CoordSubmitXCommand extends
         if (!dryrun) {
             coordJob.setLastModifiedTime(new Date());
             try {
-                CoordJobQueryExecutor.getInstance().insert(coordJob);
+                jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
             }
             catch (JPAExecutorException jpaee) {
                 coordJob.setId(null);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Sun Sep  8 02:29:31 2013
@@ -31,12 +31,9 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.command.SuspendTransitionXCommand;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -142,8 +139,8 @@ public class CoordSuspendXCommand extend
                 coordJob.resetPending();
                 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
                         + coordJob.getStatus());
-                updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
-           }
+                updateList.add(coordJob);
+            }
         }
     }
 
@@ -162,13 +159,13 @@ public class CoordSuspendXCommand extend
         coordJob.setLastModifiedTime(new Date());
         coordJob.setSuspendedTime(new Date());
         LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
-        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
+        updateList.add(coordJob);
     }
 
     @Override
     public void performWrites() throws CommandException {
         try {
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
+            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
         }
         catch (JPAExecutorException jex) {
             throw new CommandException(jex);
@@ -179,7 +176,7 @@ public class CoordSuspendXCommand extend
         action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
         action.incrementAndGetPending();
         action.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action));
+        updateList.add(action);
     }
 
     @Override

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java Sun Sep  8 02:29:31 2013
@@ -25,8 +25,7 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.command.UnpauseTransitionXCommand;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -114,7 +113,7 @@ public class CoordUnpauseXCommand extend
     @Override
     public void updateJob() throws CommandException {
         try {
-            CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coordJob);
+            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Sun Sep  8 02:29:31 2013
@@ -30,16 +30,14 @@ import org.apache.oozie.action.ActionExe
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ActionCheckerService;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.EventHandlerService;
@@ -64,7 +62,7 @@ public class ActionCheckXCommand extends
     private WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private boolean generateEvent = false;
 
     public ActionCheckXCommand(String actionId) {
@@ -142,13 +140,12 @@ public class ActionCheckXCommand extends
     @Override
     protected void verifyPrecondition() throws CommandException, PreconditionException {
         if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
-            throw new PreconditionException(ErrorCode.E0815, wfAction.isPending(), wfAction.getStatusStr());
+            throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
         }
         if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
             wfAction.setLastCheckTime(new Date());
             try {
-                WorkflowActionQueryExecutor.getInstance().executeUpdate(
-                        WorkflowActionQuery.UPDATE_ACTION_FOR_LAST_CHECKED_TIME, wfAction);
+                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
             }
             catch (JPAExecutorException e) {
                 throw new CommandException(e);
@@ -195,10 +192,9 @@ public class ActionCheckXCommand extends
                 }
             }
             wfAction.setLastCheckTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
+            updateList.add(wfAction);
             wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
-                    wfJob));
+            updateList.add(wfJob);
         }
         catch (ActionExecutorException ex) {
             LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
@@ -224,15 +220,14 @@ public class ActionCheckXCommand extends
                     break;
             }
             wfAction.setLastCheckTime(new Date());
-            updateList = new ArrayList<UpdateEntry>();
-            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
+            updateList = new ArrayList<JsonBean>();
+            updateList.add(wfAction);
             wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
-                    wfJob));
+            updateList.add(wfJob);
         }
         finally {
             try {
-                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
                 if (generateEvent && EventHandlerService.isEnabled()) {
                     generateEvent(wfAction, wfJob.getUser());
                 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Sun Sep  8 02:29:31 2013
@@ -39,13 +39,10 @@ import org.apache.oozie.client.SLAEvent.
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -68,7 +65,7 @@ public class ActionEndXCommand extends A
     private WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public ActionEndXCommand(String actionId, String type) {
@@ -128,7 +125,7 @@ public class ActionEndXCommand extends A
             }
         }
         else {
-            throw new PreconditionException(ErrorCode.E0812, wfAction.isPending(), wfAction.getStatusStr());
+            throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr());
         }
 
         executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
@@ -214,16 +211,16 @@ public class ActionEndXCommand extends A
                 if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
                     SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
                     LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
-                            + ", Set pending=" + wfAction.isPending());
+                            + ", Set pending=" + wfAction.getPending());
                     if(slaEvent != null) {
                         insertList.add(slaEvent);
                     }
                     queue(new SignalXCommand(jobId, actionId));
                 }
             }
-            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
+            updateList.add(wfAction);
             wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+            updateList.add(wfJob);
         }
         catch (ActionExecutorException ex) {
             LOG.warn(
@@ -258,13 +255,13 @@ public class ActionEndXCommand extends A
             DagELFunctions.setActionInfo(wfInstance, wfAction);
             wfJob.setWorkflowInstance(wfInstance);
 
-            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
+            updateList.add(wfAction);
             wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+            updateList.add(wfJob);
         }
         finally {
             try {
-                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
                 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                     generateEvent(wfAction, wfJob.getUser());
                 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Sun Sep  8 02:29:31 2013
@@ -31,13 +31,10 @@ import org.apache.oozie.client.SLAEvent.
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.action.control.ControlNodeActionExecutor;
@@ -61,7 +58,7 @@ public class ActionKillXCommand extends 
     private WorkflowJobBean wfJob;
     private WorkflowActionBean wfAction;
     private JPAService jpaService = null;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public ActionKillXCommand(String actionId, String type) {
@@ -140,9 +137,9 @@ public class ActionKillXCommand extends 
                     wfAction.setStatus(WorkflowActionBean.Status.KILLED);
                     wfAction.setEndTime(new Date());
 
-                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction));
+                    updateList.add(wfAction);
                     wfJob.setLastModifiedTime(new Date());
-                    updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_MODTIME, wfJob));
+                    updateList.add(wfJob);
                     // Add SLA status event (KILLED) for WF_ACTION
                     SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
                             SlaAppType.WORKFLOW_ACTION);
@@ -159,9 +156,9 @@ public class ActionKillXCommand extends 
                     wfAction.setEndTime(new Date());
 
                     wfJob.setStatus(WorkflowJobBean.Status.KILLED);
-                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction));
+                    updateList.add(wfAction);
                     wfJob.setLastModifiedTime(new Date());
-                    updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob));
+                    updateList.add(wfJob);
                     // What will happen to WF and COORD_ACTION, NOTIFICATION?
                     SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
                             SlaAppType.WORKFLOW_ACTION);
@@ -173,7 +170,7 @@ public class ActionKillXCommand extends 
                 }
                 finally {
                     try {
-                        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
+                        jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
                         if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                             generateEvent(wfAction, wfJob.getUser());
                         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Sun Sep  8 02:29:31 2013
@@ -40,13 +40,10 @@ import org.apache.oozie.client.SLAEvent.
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -73,7 +70,7 @@ public class ActionStartXCommand extends
     private WorkflowActionBean wfAction = null;
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public ActionStartXCommand(String actionId, String type) {
@@ -130,7 +127,7 @@ public class ActionStartXCommand extends
             }
         }
         else {
-            throw new PreconditionException(ErrorCode.E0816, wfAction.isPending(), wfAction.getStatusStr());
+            throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
         }
 
         executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
@@ -245,9 +242,9 @@ public class ActionStartXCommand extends
 
                 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
 
-                updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
+                updateList.add(wfAction);
                 wfJob.setLastModifiedTime(new Date());
-                updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+                updateList.add(wfJob);
                 // Add SLA status event (STARTED) for WF_ACTION
                 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
                         SlaAppType.WORKFLOW_ACTION);
@@ -298,13 +295,13 @@ public class ActionStartXCommand extends
                     }
                     break;
             }
-            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
+            updateList.add(wfAction);
             wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+            updateList.add(wfJob);
         }
         finally {
             try {
-                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
                 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                     generateEvent(wfAction, wfJob.getUser());
                 }
@@ -322,9 +319,9 @@ public class ActionStartXCommand extends
     private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
             throws CommandException {
         failJob(context);
-        updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
+        updateList.add(wfAction);
         wfJob.setLastModifiedTime(new Date());
-        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
+        updateList.add(wfJob);
         SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
                 Status.FAILED, SlaAppType.WORKFLOW_ACTION);
         if(slaEvent1 != null) {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Sun Sep  8 02:29:31 2013
@@ -29,13 +29,10 @@ import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -64,7 +61,7 @@ public class KillXCommand extends Workfl
     private List<WorkflowActionBean> actionList;
     private ActionService actionService;
     private JPAService jpaService = null;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
     public KillXCommand(String wfId) {
@@ -144,7 +141,8 @@ public class KillXCommand extends Workfl
                         || action.getStatus() == WorkflowActionBean.Status.DONE) {
                     action.setPending();
                     action.setStatus(WorkflowActionBean.Status.KILLED);
-                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
+
+                    updateList.add(action);
 
                     queue(new ActionKillXCommand(action.getId(), action.getType()));
                 }
@@ -162,7 +160,7 @@ public class KillXCommand extends Workfl
                     if(slaEvent != null) {
                         insertList.add(slaEvent);
                     }
-                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
+                    updateList.add(action);
                     if (EventHandlerService.isEnabled()
                             && !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) {
                         generateEvent(action, wfJob.getUser());
@@ -170,8 +168,8 @@ public class KillXCommand extends Workfl
                 }
             }
             wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob));
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
+            updateList.add(wfJob);
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             if (EventHandlerService.isEnabled()) {
                 generateEvent(wfJob);
             }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java Sun Sep  8 02:29:31 2013
@@ -43,12 +43,10 @@ import org.apache.oozie.client.WorkflowJ
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -89,7 +87,7 @@ public class ReRunXCommand extends Workf
     private WorkflowJobBean wfBean;
     private List<WorkflowActionBean> actions;
     private JPAService jpaService;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> deleteList = new ArrayList<JsonBean>();
 
     private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
@@ -222,9 +220,9 @@ public class ReRunXCommand extends Workf
 
         try {
             wfBean.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_RERUN, wfBean));
+            updateList.add(wfBean);
             // call JPAExecutor to do the bulk writes
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
+            jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
         }
         catch (JPAExecutorException je) {
             throw new CommandException(je);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Sun Sep  8 02:29:31 2013
@@ -32,16 +32,14 @@ import org.apache.oozie.action.control.J
 import org.apache.oozie.action.control.KillActionExecutor;
 import org.apache.oozie.action.control.StartActionExecutor;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.JPAService;
@@ -58,7 +56,7 @@ public class ResumeXCommand extends Work
     private String id;
     private JPAService jpaService = null;
     private WorkflowJobBean workflow = null;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
 
     public ResumeXCommand(String id) {
         super("resume", "resume", 1);
@@ -83,8 +81,7 @@ public class ResumeXCommand extends Work
                     // START_MANUAL or END_RETRY or END_MANUAL
                     if (action.isRetryOrManual()) {
                         action.setPendingOnly();
-                        updateList.add(new UpdateEntry<WorkflowActionQuery>(
-                                WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
+                        updateList.add(action);
                     }
 
                     if (action.isPending()) {
@@ -130,9 +127,8 @@ public class ResumeXCommand extends Work
                 }
 
                 workflow.setLastModifiedTime(new Date());
-                updateList.add(new UpdateEntry<WorkflowJobQuery>(
-                        WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, workflow));
-                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
+                updateList.add(workflow);
+                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
                 if (EventHandlerService.isEnabled()) {
                     generateEvent(workflow);
                 }
@@ -193,8 +189,7 @@ public class ResumeXCommand extends Work
     @Override
     protected void verifyPrecondition() throws CommandException, PreconditionException {
         if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
-            throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr()
-                    + " is not SUSPENDED");
+            throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED");
         }
     }
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Sun Sep  8 02:29:31 2013
@@ -31,13 +31,10 @@ import org.apache.oozie.XException;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ELService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -73,7 +70,7 @@ public class SignalXCommand extends Work
     private String actionId;
     private WorkflowJobBean wfJob;
     private WorkflowActionBean wfAction;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
     private boolean generateEvent = false;
     private String wfJobErrorCode;
@@ -191,8 +188,7 @@ public class SignalXCommand extends Work
                 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
                 queue(new NotificationXCommand(wfJob, wfAction));
             }
-            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS,
-                    wfAction));
+            updateList.add(wfAction);
             WorkflowInstance.Status endStatus = workflowInstance.getStatus();
             if (endStatus != initialStatus) {
                 generateEvent = true;
@@ -208,8 +204,7 @@ public class SignalXCommand extends Work
 
                     actionToKill.setPending();
                     actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
-                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
-                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToKill));
+                    updateList.add(actionToKill);
                     queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
                 }
 
@@ -228,8 +223,7 @@ public class SignalXCommand extends Work
                     if(slaEvent != null) {
                         insertList.add(slaEvent);
                     }
-                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
-                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToFail));
+                    updateList.add(actionToFail);
                 }
             }
             catch (JPAExecutorException je) {
@@ -274,18 +268,15 @@ public class SignalXCommand extends Work
                     try {
                         String tmpNodeConf = nodeDef.getConf();
                         String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
-                        LOG.debug(
-                                "Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], " +
-                                "after resolve [{3}]",
-                                jobId, actionId, tmpNodeConf, actionConf);
+                        LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
+                                        jobId, actionId, tmpNodeConf, actionConf);
                         if (wfAction.getErrorCode() != null) {
                             wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
                         }
                         else {
                             wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
                         }
-                        updateList.add(new UpdateEntry<WorkflowActionQuery>(
-                                WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS_ERROR, wfAction));
+                        updateList.add(wfAction);
                     }
                     catch (Exception ex) {
                         LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
@@ -310,7 +301,7 @@ public class SignalXCommand extends Work
                         oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
 
                         oldAction.setPending();
-                        updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING, oldAction));
+                        updateList.add(oldAction);
 
                         queue(new SignalXCommand(jobId, oldAction.getId()));
                     }
@@ -329,8 +320,7 @@ public class SignalXCommand extends Work
                                 .getDefinition(), wfJob.getConf());
                         newAction.setSlaXml(actionSlaXml);
                         insertList.add(newAction);
-                        LOG.debug("SignalXCommand: Name: " + newAction.getName() + ", Id: " + newAction.getId()
-                                + ", Authcode:" + newAction.getCred());
+                        LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
                         queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
                     }
                 }
@@ -342,10 +332,9 @@ public class SignalXCommand extends Work
 
         try {
             wfJob.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(
-                    WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob));
+            updateList.add(wfJob);
             // call JPAExecutor to do the bulk writes
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             if (generateEvent && EventHandlerService.isEnabled()) {
                 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
             }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SubmitXCommand.java Sun Sep  8 02:29:31 2013
@@ -42,7 +42,7 @@ import org.apache.oozie.util.ParamChecke
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.ELService;
 import org.apache.oozie.store.StoreException;
@@ -225,7 +225,7 @@ public class SubmitXCommand extends Work
                 JPAService jpaService = Services.get().get(JPAService.class);
                 if (jpaService != null) {
                     try {
-                        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+                        jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
                     }
                     catch (JPAExecutorException je) {
                         throw new CommandException(je);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Sun Sep  8 02:29:31 2013
@@ -25,15 +25,13 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
 import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -48,7 +46,7 @@ public class SuspendXCommand extends Wor
     private final String wfid;
     private WorkflowJobBean wfJobBean;
     private JPAService jpaService;
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+    private List<JsonBean> updateList = new ArrayList<JsonBean>();
 
     public SuspendXCommand(String id) {
         super("suspend", "suspend", 1);
@@ -61,9 +59,8 @@ public class SuspendXCommand extends Wor
         try {
             suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList);
             this.wfJobBean.setLastModifiedTime(new Date());
-            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
-                    this.wfJobBean));
-            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
+            updateList.add(this.wfJobBean);
+            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
             queue(new NotificationXCommand(this.wfJobBean));
         }
         catch (WorkflowException e) {
@@ -90,7 +87,7 @@ public class SuspendXCommand extends Wor
      * @throws CommandException thrown if unable set pending false for actions
      */
     public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id,
-            String actionId, List<UpdateEntry> updateList) throws WorkflowException, CommandException {
+            String actionId, List<JsonBean> updateList) throws WorkflowException, CommandException {
         if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
             workflow.getWorkflowInstance().suspend();
             WorkflowInstance wfInstance = workflow.getWorkflowInstance();
@@ -115,7 +112,7 @@ public class SuspendXCommand extends Wor
      * @throws CommandException thrown if failed to update workflow action
      */
     private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId,
-            List<UpdateEntry> updateList) throws CommandException {
+            List<JsonBean> updateList) throws CommandException {
         List<WorkflowActionBean> actions;
         try {
             actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id));
@@ -131,8 +128,7 @@ public class SuspendXCommand extends Wor
                 if (updateList != null) { // will be null when suspendJob
                                           // invoked statically via
                                           // handleNonTransient()
-                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
-                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
+                    updateList.add(action);
                 }
             }
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateDeleteJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Class for updating and deleting beans in bulk
+ */
+public class BulkUpdateDeleteJPAExecutor implements JPAExecutor<Void> {
+
+    private Collection<JsonBean> updateList;
+    private Collection<JsonBean> deleteList;
+    private boolean forRerun = true;
+
+    /**
+     * Initialize the JPAExecutor using the update and delete list of JSON beans
+     * @param deleteList
+     * @param updateList
+     */
+    public BulkUpdateDeleteJPAExecutor(Collection<JsonBean> updateList, Collection<JsonBean> deleteList,
+            boolean forRerun) {
+        this.updateList = updateList;
+        this.deleteList = deleteList;
+        this.forRerun = forRerun;
+    }
+
+    public BulkUpdateDeleteJPAExecutor() {
+    }
+
+    /**
+     * Sets the update list for JSON bean
+     *
+     * @param updateList
+     */
+    public void setUpdateList(Collection<JsonBean> updateList) {
+        this.updateList = updateList;
+    }
+
+    /**
+     * Sets the delete list for JSON bean
+     *
+     * @param deleteList
+     */
+    public void setDeleteList(Collection<JsonBean> deleteList) {
+        this.deleteList = deleteList;
+    }
+
+    /**
+     * Sets whether for RerunX command or no. Else it'd be for ChangeX
+     *
+     * @param forRerun
+     */
+    public void setForRerun(boolean forRerun) {
+        this.forRerun = forRerun;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BulkUpdateDeleteJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            if (updateList != null) {
+                for (JsonBean entity : updateList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    em.merge(entity);
+                }
+            }
+            // Only used by test cases to check for rollback of transaction
+            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+            if (deleteList != null) {
+                for (JsonBean entity : deleteList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    if (forRerun) {
+                        em.remove(em.merge(entity));
+                    }
+                    else if (entity instanceof CoordinatorActionBean) {
+                        Query g = em.createNamedQuery("DELETE_UNSCHEDULED_ACTION");
+                        String coordActionId = ((CoordinatorActionBean) entity).getId();
+                        g.setParameter("id", coordActionId);
+                        int actionsDeleted = g.executeUpdate();
+                        if (actionsDeleted == 0)
+                            throw new JPAExecutorException(ErrorCode.E1022, coordActionId);
+                    }
+                    else {
+                        em.remove(em.merge(entity));
+                    }
+                }
+            }
+            return null;
+        }
+        catch (JPAExecutorException je) {
+            throw je;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStartJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStartJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStartJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Class for inserting and updating beans in bulk
+ *
+ */
+public class BulkUpdateInsertForCoordActionStartJPAExecutor implements JPAExecutor<Void> {
+
+    private Collection<JsonBean> updateList;
+    private Collection<JsonBean> insertList;
+
+    /**
+     * Initialize the JPAExecutor using the update and insert list of JSON beans
+     *
+     * @param updateList
+     * @param insertList
+     */
+    public BulkUpdateInsertForCoordActionStartJPAExecutor(Collection<JsonBean> updateList,
+            Collection<JsonBean> insertList) {
+        this.updateList = updateList;
+        this.insertList = insertList;
+    }
+
+    public BulkUpdateInsertForCoordActionStartJPAExecutor() {
+    }
+
+    /**
+     * Sets the update list for JSON bean
+     *
+     * @param updateList
+     */
+    public void setUpdateList(Collection<JsonBean> updateList) {
+        this.updateList = updateList;
+    }
+
+    /**
+     * Sets the insert list for JSON bean
+     *
+     * @param insertList
+     */
+    public void setInsertList(Collection<JsonBean> insertList) {
+        this.insertList = insertList;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BulkUpdateInsertForCoordActionStartJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            if (insertList != null) {
+                for (JsonBean entity : insertList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    em.persist(entity);
+                }
+            }
+            // Only used by test cases to check for rollback of transaction
+            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+            if (updateList != null) {
+                for (JsonBean entity : updateList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    if (entity instanceof CoordinatorActionBean) {
+                        CoordinatorActionBean action = (CoordinatorActionBean) entity;
+                        Query q = em.createNamedQuery("UPDATE_COORD_ACTION_FOR_START");
+                        q.setParameter("id", action.getId());
+                        q.setParameter("status", action.getStatus().toString());
+                        q.setParameter("lastModifiedTime", new Date());
+                        q.setParameter("runConf", action.getRunConf());
+                        q.setParameter("externalId", action.getExternalId());
+                        q.setParameter("pending", action.getPending());
+                        q.setParameter("errorCode", action.getErrorCode());
+                        q.setParameter("errorMessage", action.getErrorMessage());
+                        q.executeUpdate();
+                    }
+                    else {
+                        em.merge(entity);
+                    }
+                }
+            }
+            // Since the return type is Void, we have to return null
+            return null;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStatusJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStatusJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStatusJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertForCoordActionStatusJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Class for inserting and updating beans in bulk
+ *
+ */
+public class BulkUpdateInsertForCoordActionStatusJPAExecutor implements JPAExecutor<Void> {
+
+    private Collection<JsonBean> updateList;
+    private Collection<JsonBean> insertList;
+
+    /**
+     * Initialize the JPAExecutor using the update and insert list of JSON beans
+     *
+     * @param updateList
+     */
+    public BulkUpdateInsertForCoordActionStatusJPAExecutor(Collection<JsonBean> updateList,
+            Collection<JsonBean> insertList) {
+        this.updateList = updateList;
+        this.insertList = insertList;
+    }
+
+    public BulkUpdateInsertForCoordActionStatusJPAExecutor() {
+    }
+
+    /**
+     * Sets the update list for JSON bean
+     *
+     * @param updateList
+     */
+    public void setUpdateList(Collection<JsonBean> updateList) {
+        this.updateList = updateList;
+    }
+
+    /**
+     * Sets the insert list for JSON bean
+     *
+     * @param insertList
+     */
+    public void setInsertList(Collection<JsonBean> insertList) {
+        this.insertList = insertList;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BulkUpdateInsertForCoordActionStatusJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            if (insertList != null) {
+                for (JsonBean entity : insertList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    em.persist(entity);
+                }
+            }
+            // Only used by test cases to check for rollback of transaction
+            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+            if (updateList != null) {
+                for (JsonBean entity : updateList) {
+                    ParamChecker.notNull(entity, "JsonBean");
+                    if (entity instanceof CoordinatorActionBean) {
+                        CoordinatorActionBean action = (CoordinatorActionBean) entity;
+                        Query q = em.createNamedQuery("UPDATE_COORD_ACTION_STATUS_PENDING_TIME");
+                        q.setParameter("id", action.getId());
+                        q.setParameter("status", action.getStatus().toString());
+                        q.setParameter("pending", action.getPending());
+                        q.setParameter("lastModifiedTime", new Date());
+                        q.executeUpdate();
+                    }
+                    else {
+                        em.merge(entity);
+                    }
+                }
+            }
+            // Since the return type is Void, we have to return null
+            return null;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkUpdateInsertJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+
+import javax.persistence.EntityManager;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Class for inserting and updating beans in bulk
+ * @param <T>
+*/
+public class BulkUpdateInsertJPAExecutor implements JPAExecutor<Void> {
+
+    private Collection<JsonBean> updateList;
+    private Collection<JsonBean> insertList;
+
+    /**
+     * Initialize the JPAExecutor using the insert and update list of JSON beans
+     * @param updateList
+     * @param insertList
+     */
+    public BulkUpdateInsertJPAExecutor(Collection<JsonBean> updateList, Collection<JsonBean> insertList) {
+        this.updateList = updateList;
+        this.insertList = insertList;
+    }
+
+    public BulkUpdateInsertJPAExecutor() {
+    }
+
+    /**
+     * Sets the update list for JSON bean
+     * @param updateList
+     */
+    public void setUpdateList(Collection<JsonBean> updateList) {
+        this.updateList = updateList;
+    }
+
+    /**
+     * Sets the insert list for JSON bean
+     * @param insertList
+     */
+    public void setInsertList(Collection<JsonBean> insertList) {
+        this.insertList = insertList;
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BulkUpdateInsertJPAExecutor";
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            if (insertList!= null){
+                for (JsonBean entity: insertList){
+                    ParamChecker.notNull(entity, "JsonBean");
+                    em.persist(entity);
+                }
+            }
+            // Only used by test cases to check for rollback of transaction
+            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+            if (updateList!= null){
+                for (JsonBean entity: updateList){
+                    ParamChecker.notNull(entity, "JsonBean");
+                    em.merge(entity);
+                }
+            }
+            return null;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionUpdateJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionUpdateJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionUpdateJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionUpdateJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Update the given bundle action bean to DB.
+ */
+public class BundleActionUpdateJPAExecutor implements JPAExecutor<Void> {
+
+    private BundleActionBean bundleAction = null;
+
+    /**
+     * The constructor for class {@link BundleActionUpdateJPAExecutor}
+     *
+     * @param bundleAction bundle action bean
+     */
+    public BundleActionUpdateJPAExecutor(BundleActionBean bundleAction) {
+        ParamChecker.notNull(bundleAction, " bundleAction");
+        this.bundleAction = bundleAction;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BundleActionUpdateJPAExecutor";
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        em.merge(bundleAction);
+        return null;
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobUpdateJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobUpdateJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobUpdateJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobUpdateJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ *  Update the given bundle job bean to DB.
+ */
+public class BundleJobUpdateJPAExecutor implements JPAExecutor<Void> {
+
+    private BundleJobBean bundleJob = null;
+
+    /**
+     * The constructor for class {@link BundleJobUpdateJPAExecutor}
+     *
+     * @param bundleJob bundle job bean
+     */
+    public BundleJobUpdateJPAExecutor(BundleJobBean bundleJob) {
+        ParamChecker.notNull(bundleJob, "bundleJob");
+        this.bundleJob = bundleJob;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BundleJobUpdateJPAExecutor";
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        em.merge(bundleJob);
+        return null;
+    }
+
+}