You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/09/08 04:47:41 UTC

svn commit: r1520829 [4/6] - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java Sun Sep  8 02:47:39 2013
@@ -39,13 +39,15 @@ import org.apache.oozie.command.bundle.B
 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.util.DateUtils;
@@ -637,7 +639,7 @@ public class StatusTransitService implem
                 bundleJob.resetPending();
                 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
             }
-            jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
+            BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, bundleJob);
         }
 
         private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
@@ -662,7 +664,7 @@ public class StatusTransitService implem
             if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
                 LOG.debug("Updating coord job " + coordJob.getId());
                 coordJob.setLastModifiedTime(new Date());
-                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob);
             }
             // update bundle action only when status changes in coord job
             if (coordJob.getBundleId() != null) {
@@ -687,7 +689,7 @@ public class StatusTransitService implem
             }
             boolean hasChange = prevPending != coordJob.isPending();
             if (saveToDB && hasChange) {
-                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob);
             }
             return hasChange;
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Sun Sep  8 02:47:39 2013
@@ -40,16 +40,18 @@ import org.apache.oozie.client.event.Job
 import org.apache.oozie.client.event.SLAEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
-
-import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.ServiceException;
@@ -112,7 +114,9 @@ public class SLACalculatorMemory impleme
                             break;
                     }
                     if (isJobModified) {
-                        jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean));
+                        summaryBean.setLastModifiedTime(new Date());
+                        SLASummaryQueryExecutor.getInstance().executeUpdate(
+                                SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
                     }
                 }
                 catch (Exception e) {
@@ -352,7 +356,9 @@ public class SLACalculatorMemory impleme
                 slaSummaryBean.setActualStart(slaCalc.getActualStart());
                 slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
                 slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
-                jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaSummaryBean));
+                slaSummaryBean.setLastModifiedTime(new Date());
+                SLASummaryQueryExecutor.getInstance().executeUpdate(
+                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
                 if (eventProc == 7) {
                     historySet.add(jobId);
                     slaMap.remove(jobId);
@@ -397,7 +403,7 @@ public class SLACalculatorMemory impleme
                 List<JsonBean> insertList = new ArrayList<JsonBean>();
                 insertList.add(reg);
                 insertList.add(new SLASummaryBean(slaCalc));
-                jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
+                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
                 LOG.trace("SLA Registration Event - Job:" + jobId);
                 return true;
             }
@@ -442,10 +448,11 @@ public class SLACalculatorMemory impleme
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
                 slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
                 slaMap.put(jobId, slaCalc);
-                List<JsonBean> updateList = new ArrayList<JsonBean>();
-                updateList.add(reg);
-                updateList.add(new SLASummaryBean(slaCalc));
-                jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+                List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+                updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg));
+                updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
+                        new SLASummaryBean(slaCalc)));
+                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
                 LOG.trace("SLA Registration Event - Job:" + jobId);
                 return true;
             }
@@ -524,7 +531,9 @@ public class SLACalculatorMemory impleme
         }
 
         if (hasSla) {
-            jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo));
+            slaInfo.setLastModifiedTime(new Date());
+            SLASummaryQueryExecutor.getInstance().executeUpdate(
+                    SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
         }
         return hasSla;
     }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java Sun Sep  8 02:47:39 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.sla;
 
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -46,6 +47,8 @@ import org.json.simple.JSONObject;
 @Table(name = "SLA_REGISTRATION")
 @NamedQueries({
 
+ @NamedQuery(name = "UPDATE_SLA_REG_ALL", query = "update SLARegistrationBean w set w.jobId = :jobId, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.slaConfig = :slaConfig, w.notificationMsg = :notificationMsg, w.upstreamApps = :upstreamApps, w.appType = :appType, w.appName = :appName, w.user = :user, w.parentId = :parentId, w.jobData = :jobData where w.jobId = :jobId"),
+
  @NamedQuery(name = "GET_SLA_REG_ON_RESTART", query = "select w.notificationMsg, w.upstreamApps, w.slaConfig, w.jobData from SLARegistrationBean w where w.jobId = :id"),
 
  @NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id") })
@@ -160,6 +163,10 @@ public class SLARegistrationBean impleme
         return DateUtils.toDate(nominalTimeTS);
     }
 
+    public Timestamp getNominalTimestamp() {
+        return this.nominalTimeTS;
+    }
+
     public void setNominalTime(Date nominalTime) {
         this.nominalTimeTS = DateUtils.convertDateToTimestamp(nominalTime);
     }
@@ -168,6 +175,10 @@ public class SLARegistrationBean impleme
         return DateUtils.toDate(expectedStartTS);
     }
 
+    public Timestamp getExpectedStartTimestamp() {
+        return this.expectedStartTS;
+    }
+
     public void setExpectedStart(Date expectedStart) {
         this.expectedStartTS = DateUtils.convertDateToTimestamp(expectedStart);
     }
@@ -176,6 +187,10 @@ public class SLARegistrationBean impleme
         return DateUtils.toDate(expectedEndTS);
     }
 
+    public Timestamp getExpectedEndTimestamp() {
+        return this.expectedEndTS;
+    }
+
     public void setExpectedEnd(Date expectedEnd) {
         this.expectedEndTS = DateUtils.convertDateToTimestamp(expectedEnd);
     }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java Sun Sep  8 02:47:39 2013
@@ -47,6 +47,8 @@ import org.json.simple.JSONObject;
 
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
 
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_ALL", query = "update SLASummaryBean w set w.jobId = :jobId, w.appName = :appName, w.appType = :appType, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.jobStatus = :jobStatus, w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.lastModifiedTS = :lastModTime, w.user = :user, w.parentId = :parentId, w.eventProcessed = :eventProcessed, w.actualDuration = :actualDuration, w.actualEndTS = :actualEndTS, w.actualStartTS = :actualStartTS where w.jobId = :jobId"),
+
  @NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"),
 
  @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime") })
@@ -174,6 +176,10 @@ public class SLASummaryBean implements J
         return DateUtils.toDate(nominalTimeTS);
     }
 
+    public Timestamp getNominalTimestamp() {
+        return this.nominalTimeTS;
+    }
+
     public void setNominalTime(Date nominalTime) {
         this.nominalTimeTS = DateUtils.convertDateToTimestamp(nominalTime);
     }
@@ -183,6 +189,10 @@ public class SLASummaryBean implements J
         return DateUtils.toDate(expectedStartTS);
     }
 
+    public Timestamp getExpectedStartTimestamp() {
+        return this.expectedStartTS;
+    }
+
     public void setExpectedStart(Date expectedStart) {
         this.expectedStartTS = DateUtils.convertDateToTimestamp(expectedStart);
     }
@@ -191,6 +201,9 @@ public class SLASummaryBean implements J
         return DateUtils.toDate(expectedEndTS);
     }
 
+    public Timestamp getExpectedEndTimestamp() {
+        return this.expectedEndTS;
+    }
     public void setExpectedEnd(Date expectedEnd) {
         this.expectedEndTS = DateUtils.convertDateToTimestamp(expectedEnd);
     }
@@ -207,6 +220,10 @@ public class SLASummaryBean implements J
         return DateUtils.toDate(actualStartTS);
     }
 
+    public Timestamp getActualStartTimestamp() {
+        return this.actualStartTS;
+    }
+
     public void setActualStart(Date actualStart) {
         this.actualStartTS = DateUtils.convertDateToTimestamp(actualStart);
     }
@@ -215,6 +232,10 @@ public class SLASummaryBean implements J
         return DateUtils.toDate(actualEndTS);
     }
 
+    public Timestamp getActualEndTimestamp() {
+        return this.actualEndTS;
+    }
+
     public void setActualEnd(Date actualEnd) {
         this.actualEndTS = DateUtils.convertDateToTimestamp(actualEnd);
     }
@@ -295,6 +316,10 @@ public class SLASummaryBean implements J
         return DateUtils.toDate(lastModifiedTS);
     }
 
+    public Timestamp getLastModifiedTimestamp() {
+        return this.lastModifiedTS;
+    }
+
     public void setLastModifiedTime(Date lastModified) {
         this.lastModifiedTS = DateUtils.convertDateToTimestamp(lastModified);
     }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java Sun Sep  8 02:47:39 2013
@@ -34,6 +34,10 @@ import org.apache.oozie.CoordinatorJobIn
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
@@ -351,14 +355,12 @@ public class CoordinatorStore extends St
      * @param action Action Bean
      * @throws StoreException if action doesn't exist
      */
-    public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
+    public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
         ParamChecker.notNull(action, "CoordinatorActionBean");
         doOperation("updateCoordinatorAction", new Callable<Void>() {
-            public Void call() throws StoreException {
-                Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
-                q.setParameter("id", action.getId());
-                setActionQueryParameters(action, q);
-                q.executeUpdate();
+            public Void call() throws StoreException, JPAExecutorException {
+                CoordActionQueryExecutor.getInstance().executeUpdate(
+                        CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION, action);
                 return null;
             }
         });
@@ -370,17 +372,12 @@ public class CoordinatorStore extends St
      * @param action Action Bean
      * @throws StoreException if action doesn't exist
      */
-    public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
+    public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
         ParamChecker.notNull(action, "CoordinatorActionBean");
         doOperation("updateCoordinatorAction", new Callable<Void>() {
-            public Void call() throws StoreException {
-                Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
-                q.setParameter("id", action.getId());
-                q.setParameter("missingDependencies", action.getMissingDependencies());
-                q.setParameter("lastModifiedTime", new Date());
-                q.setParameter("status", action.getStatus().toString());
-                q.setParameter("actionXml", action.getActionXml());
-                q.executeUpdate();
+            public Void call() throws StoreException, JPAExecutorException {
+                CoordActionQueryExecutor.getInstance().executeUpdate(
+                        CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, action);
                 return null;
             }
         });
@@ -395,11 +392,8 @@ public class CoordinatorStore extends St
     public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
         ParamChecker.notNull(job, "CoordinatorJobBean");
         doOperation("updateJob", new Callable<Void>() {
-            public Void call() throws StoreException {
-                Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
-                q.setParameter("id", job.getId());
-                setJobQueryParameters(job, q);
-                q.executeUpdate();
+            public Void call() throws StoreException, JPAExecutorException {
+                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
                 return null;
             }
         });
@@ -408,12 +402,8 @@ public class CoordinatorStore extends St
     public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
         ParamChecker.notNull(job, "CoordinatorJobBean");
         doOperation("updateJobStatus", new Callable<Void>() {
-            public Void call() throws StoreException {
-                Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
-                q.setParameter("id", job.getId());
-                q.setParameter("status", job.getStatus().toString());
-                q.setParameter("lastModifiedTime", new Date());
-                q.executeUpdate();
+            public Void call() throws StoreException, JPAExecutorException {
+                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME, job);
                 return null;
             }
         });

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java Sun Sep  8 02:47:39 2013
@@ -35,6 +35,9 @@ import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.WorkflowsInfo;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowJob.Status;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
@@ -198,11 +201,9 @@ public class WorkflowStore extends Store
     public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
         ParamChecker.notNull(wfBean, "WorkflowJobBean");
         doOperation("updateWorkflow", new Callable<Void>() {
-            public Void call() throws SQLException, StoreException, WorkflowException {
-                Query q = entityManager.createNamedQuery("UPDATE_WORKFLOW");
-                q.setParameter("id", wfBean.getId());
-                setWFQueryParameters(wfBean, q);
-                q.executeUpdate();
+            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
+                WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                        WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
                 return null;
             }
         });
@@ -274,11 +275,9 @@ public class WorkflowStore extends Store
     public void updateAction(final WorkflowActionBean action) throws StoreException {
         ParamChecker.notNull(action, "WorkflowActionBean");
         doOperation("updateAction", new Callable<Void>() {
-            public Void call() throws SQLException, StoreException, WorkflowException {
-                Query q = entityManager.createNamedQuery("UPDATE_ACTION");
-                q.setParameter("id", action.getId());
-                setActionQueryParameters(action, q);
-                q.executeUpdate();
+            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
+                WorkflowActionQueryExecutor.getInstance().executeUpdate(
+                        WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
                 return null;
             }
         });
@@ -935,7 +934,7 @@ public class WorkflowStore extends Store
             action.setExecutionPath(a.getExecutionPath());
             action.setLastCheckTime(a.getLastCheckTime());
             action.setLogToken(a.getLogToken());
-            if (a.getPending() == true) {
+            if (a.isPending() == true) {
                 action.setPending();
             }
             action.setPendingAge(a.getPendingAge());

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java Sun Sep  8 02:47:39 2013
@@ -18,9 +18,6 @@
 package org.apache.oozie.command;
 
 import java.util.Date;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.fail;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -43,12 +40,14 @@ import org.apache.oozie.executor.jpa.Bun
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
@@ -1393,16 +1392,16 @@ public class TestPurgeXCommand extends X
         CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob2.setAppName("coord2");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2);
         CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob3.setAppName("coord3");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3);
         CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob4.setAppName("coord4");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob4);
         CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob5.setAppName("coord5");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob5);
         WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
@@ -1803,16 +1802,16 @@ public class TestPurgeXCommand extends X
         CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob2.setAppName("coord2");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob2));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2);
         CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob3.setAppName("coord3");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob3));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3);
         CoordinatorJobBean coordJob4 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob4.setAppName("coord4");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob4));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob4);
         CoordinatorJobBean coordJob5 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
         coordJob5.setAppName("coord5");
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob5));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob5);
         WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
@@ -3822,7 +3821,7 @@ public class TestPurgeXCommand extends X
         try {
             JPAService jpaService = Services.get().get(JPAService.class);
             assertNotNull(jpaService);
-            jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+            CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, job);
         }
         catch (JPAExecutorException je) {
             je.printStackTrace();
@@ -3837,7 +3836,7 @@ public class TestPurgeXCommand extends X
         try {
             JPAService jpaService = Services.get().get(JPAService.class);
             assertNotNull(jpaService);
-            jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, job);
         }
         catch (JPAExecutorException je) {
             je.printStackTrace();

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java Sun Sep  8 02:47:39 2013
@@ -27,7 +27,8 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
@@ -84,7 +85,7 @@ public class TestBundleChangeXCommand ex
         coordJob.setBundleId(bundleJob.getId());
         final JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_BUNDLEID, coordJob);
 
         BundleActionBean bundleAction = new BundleActionBean();
         bundleAction.setBundleActionId("11111");

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java Sun Sep  8 02:47:39 2013
@@ -267,6 +267,7 @@ public class TestBundleStartXCommand ext
         assertEquals(Job.Status.FAILED, actions.get(0).getStatus());
         Runnable runnable = new StatusTransitRunnable();
         runnable.run();
+        sleep(2000);
         job = jpaService.execute(bundleJobGetExecutor);
         assertEquals(job.getStatus(), Job.Status.KILLED);
     }

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Sun Sep  8 02:47:39 2013
@@ -36,8 +36,8 @@ import org.apache.oozie.coord.CoordELFun
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
@@ -235,7 +235,9 @@ public class TestCoordActionInputCheckXC
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(
+                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+                action);
         action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
 
@@ -280,7 +282,9 @@ public class TestCoordActionInputCheckXC
                 .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
         action.setPushMissingDependencies(pushMissingDependency);
-        jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(
+                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK,
+                action);
 
         // Update action creation time
         String actionXML = action.getActionXml();
@@ -289,7 +293,9 @@ public class TestCoordActionInputCheckXC
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(
+                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+                action);
         action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
 
@@ -362,7 +368,9 @@ public class TestCoordActionInputCheckXC
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(
+                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+                action);
         action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
 
@@ -406,7 +414,9 @@ public class TestCoordActionInputCheckXC
                 .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
         action.setPushMissingDependencies(pushMissingDependency);
-        jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(
+                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK,
+                action);
 
         // Update action creation time
         String actionXML = action.getActionXml();
@@ -415,7 +425,9 @@ public class TestCoordActionInputCheckXC
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(
+                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK,
+                action);
         action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
 
@@ -700,7 +712,7 @@ public class TestCoordActionInputCheckXC
         action.setStatus(CoordinatorAction.Status.WAITING);
         try {
             jpaService = Services.get().get(JPAService.class);
-            jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+            CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, action);
         }
         catch (JPAExecutorException se) {
             fail("Action ID " + coordJob.getId() + "@1" + " was not stored properly in db");

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java Sun Sep  8 02:47:39 2013
@@ -25,7 +25,8 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
@@ -143,7 +144,7 @@ public class TestCoordActionsKillXComman
                 "coord-action-get.xml", 0);
         action2.setNominalTime(DateUtils.parseDateOozieTZ("2009-12-15T02:00Z"));
         action2.setExternalId(null);
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action2));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action2);
 
         WorkflowJobBean wf = new WorkflowJobBean();
         WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef(

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java Sun Sep  8 02:47:39 2013
@@ -31,11 +31,11 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.JPAService;
@@ -350,7 +350,7 @@ public class TestCoordChangeXCommand ext
         insertList.add(slaSummaryBean3);
 
         JPAService jpaService = Services.get().get(JPAService.class);
-        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
 
         new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call();
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java Sun Sep  8 02:47:39 2013
@@ -32,7 +32,8 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.PartitionDependencyManagerService;
@@ -101,7 +102,7 @@ public class TestCoordKillXCommand exten
         // RUNNINGWITHERROR if it had loaded status and had it as RUNNING in memory when CoordKill
         // executes and updates status to KILLED in database.
         job.setStatus(CoordinatorJob.Status.RUNNINGWITHERROR);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job);
         job = jpaService.execute(coordJobGetCmd);
         assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
         final CoordMaterializeTransitionXCommand transitionCmd = new CoordMaterializeTransitionXCommand(job.getId(), 3600);
@@ -199,7 +200,7 @@ public class TestCoordKillXCommand exten
         CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
 
         job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
 
         CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
         CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java Sun Sep  8 02:47:39 2013
@@ -45,8 +45,9 @@ import org.apache.oozie.coord.CoordELFun
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchemaService;
@@ -845,7 +846,7 @@ public class TestCoordRerunXCommand exte
         final JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
         coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
 
         CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
                 CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -894,7 +895,7 @@ public class TestCoordRerunXCommand exte
         final JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
         coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
 
         CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
                 CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -944,7 +945,7 @@ public class TestCoordRerunXCommand exte
         final JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
         coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
 
         CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
                 CoordinatorAction.Status.SUCCEEDED, "coord-rerun-action1.xml", 0);

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java Sun Sep  8 02:47:39 2013
@@ -20,7 +20,8 @@ package org.apache.oozie.command.coord;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
@@ -127,7 +128,7 @@ public class TestCoordResumeXCommand ext
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
         job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
         JPAService jpaService = Services.get().get(JPAService.class);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(job));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
 
         assertNotNull(jpaService);
         CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Sun Sep  8 02:47:39 2013
@@ -329,7 +329,7 @@ public class TestActionErrors extends XD
         assertEquals("TEST_ERROR", action.getErrorCode());
         assertEquals(expErrorMsg, action.getErrorMessage());
         assertEquals(expStatus1, action.getStatus());
-        assertTrue(action.getPending() == false);
+        assertTrue(action.isPending() == false);
 
         assertTrue(engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUSPENDED);
 
@@ -410,7 +410,7 @@ public class TestActionErrors extends XD
         assertEquals("TEST_ERROR", action.getErrorCode());
         assertEquals(expErrorMsg, action.getErrorMessage());
         assertEquals(expStatus1, action.getStatus());
-        assertFalse(action.getPending());
+        assertFalse(action.isPending());
 
         assertEquals (WorkflowJob.Status.SUSPENDED, job.getStatus());
 
@@ -509,7 +509,7 @@ public class TestActionErrors extends XD
         assertEquals("TEST_ERROR", action.getErrorCode());
         assertEquals(expErrorMsg, action.getErrorMessage());
         assertEquals(expStatus2, action.getStatus());
-        assertTrue(action.getPending() == false);
+        assertTrue(action.isPending() == false);
         assertEquals(WorkflowJob.Status.SUSPENDED, engine.getJob(jobId).getStatus());
         store2.commitTrx();
         store2.closeTrx();

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java Sun Sep  8 02:47:39 2013
@@ -133,7 +133,7 @@ public class TestActionStartXCommand ext
 
         WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
         WorkflowActionBean action = super.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
-        assertFalse(action.getPending());
+        assertFalse(action.isPending());
 
         assertNull(inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP));
         ActionStartXCommand startCmd = new ActionStartXCommand(action.getId(), "map-reduce");

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1520829&r1=1520828&r2=1520829&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Sun Sep  8 02:47:39 2013
@@ -64,16 +64,20 @@ import org.apache.oozie.command.wf.Suspe
 import org.apache.oozie.command.wf.WorkflowXCommand;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -196,7 +200,7 @@ public class TestEventGeneration extends
         LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
         wfInstance.start();
         job.setWfInstance(wfInstance);
-        jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, job);
         WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
         new SignalXCommand(job.getId(), wfAction.getId()).call();
         job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
@@ -271,9 +275,9 @@ public class TestEventGeneration extends
         action = jpaService.execute(coordGetCmd);
         WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
         wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
-        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
         action.setStatus(CoordinatorAction.Status.RUNNING);
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
         new CoordActionCheckXCommand(action.getId(), 0).call();
         action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
@@ -290,9 +294,8 @@ public class TestEventGeneration extends
 
         // Action Failure
         action.setStatus(CoordinatorAction.Status.RUNNING);
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
-        wfJob.setStatus(WorkflowJob.Status.FAILED);
-        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
         new CoordActionCheckXCommand(action.getId(), 0).call();
         action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
@@ -308,14 +311,14 @@ public class TestEventGeneration extends
 
         // Action start on Coord Resume
         coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coord);
         action.setStatus(CoordinatorAction.Status.SUSPENDED);
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
         wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
         WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
         ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
         wfJob.setWorkflowInstance(wfInstance);
-        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob);
         new CoordResumeXCommand(coord.getId()).call();
         Thread.sleep(5000);
         CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
@@ -328,7 +331,7 @@ public class TestEventGeneration extends
 
         // Action going to WAITING on Coord Rerun
         action.setStatus(CoordinatorAction.Status.SUCCEEDED);
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
         queue.clear();
         new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, "1", false, true)
                 .call();
@@ -395,7 +398,7 @@ public class TestEventGeneration extends
         action.setStatus(WorkflowAction.Status.KILLED);
         action.setPendingOnly();
         action.setEndTime(null); //its already set by XTestCase add action record method above
-        jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+        WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_END, action);
         new ActionKillXCommand(action.getId()).call();
         action = jpaService.execute(wfActionGetCmd);
         assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
@@ -442,7 +445,8 @@ public class TestEventGeneration extends
                 CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
         WorkflowJobBean wjb = new WorkflowJobBean();
         wjb.setId(action.getExternalId());
-        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+        wjb.setLastModifiedTime(new Date());
+        WorkflowJobQueryExecutor.getInstance().insert(wjb);
 
         CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
             @Override
@@ -454,7 +458,7 @@ public class TestEventGeneration extends
 
         // CASE 1: Only pull missing deps
         action.setMissingDependencies("pull");
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
         myCmd.call();
         CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
         assertNotNull(event);
@@ -463,7 +467,7 @@ public class TestEventGeneration extends
         // CASE 2: Only hcat (push) missing deps
         action.setMissingDependencies(null);
         action.setPushMissingDependencies("push");
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
         myCmd.call();
         event = (CoordinatorActionEvent) queue.poll();
         assertNotNull(event);
@@ -471,7 +475,7 @@ public class TestEventGeneration extends
 
         // CASE 3: Both types
         action.setMissingDependencies("pull");
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
         myCmd.call();
         event = (CoordinatorActionEvent) queue.poll();
         assertNotNull(event);
@@ -533,7 +537,7 @@ public class TestEventGeneration extends
         WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
                 action.getId());
         action.setExternalId(wf.getId());
-        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
 
         String waId = _createWorkflowAction(wf.getId(), "wf-action");
         new ActionStartXCommand(waId, action.getType()).call();
@@ -613,7 +617,7 @@ public class TestEventGeneration extends
                 executionPath, true);
         wfAction.setPending();
         wfAction.setSignalValue(WorkflowAction.Status.OK.name());
-        jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
+        WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION, wfAction);
 
         return workflow;
     }
@@ -623,7 +627,7 @@ public class TestEventGeneration extends
         writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
         String coordXml = coord.getJobXml();
         coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
-        jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord);
     }
 
     private String _createWorkflowAction(String wfId, String actionName) throws JPAExecutorException {

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java?rev=1520829&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBatchQueryExecutor.java Sun Sep  8 02:47:39 2013
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestBatchQueryExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testExecuteBatchUpdateInsertDelete() throws Exception {
+        BatchQueryExecutor executor = BatchQueryExecutor.getInstance();
+        // for update
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+        // for insert
+        CoordinatorActionBean coordAction = new CoordinatorActionBean();
+        coordAction.setId("testCoordAction1");
+        JPAService jpaService = Services.get().get(JPAService.class);
+
+        // update the status
+        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+
+        // update the list for doing bulk writes
+        List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME, coordJob));
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wfJob));
+
+        // insert beans
+        Collection<JsonBean> insertList = new ArrayList<JsonBean>();
+        insertList.add(coordAction);
+
+        // delete beans
+        Collection<JsonBean> deleteList = new ArrayList<JsonBean>();
+        deleteList.add(wfAction);
+
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, deleteList);
+
+        // check update after running ExecuteBatchUpdateInsertDelete
+        coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordJob.getId());
+        assertEquals("RUNNING", coordJob.getStatusStr());
+        wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, wfJob.getId());
+        assertEquals("SUCCEEDED", wfJob.getStatusStr());
+        coordAction = CoordActionQueryExecutor.getInstance()
+                .get(CoordActionQuery.GET_COORD_ACTION, coordAction.getId());
+        assertEquals("testCoordAction1", coordAction.getId());
+        try {
+            wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, wfJob.getId());
+            fail();
+        }
+        catch (JPAExecutorException ex) {
+            assertEquals(ex.getErrorCode().toString(), "E0605");
+        }
+    }
+
+    public void testExecuteBatchUpdateInsertDeleteRollBack() throws Exception {
+        BatchQueryExecutor executor = BatchQueryExecutor.getInstance();
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+        job.setStatus(WorkflowJob.Status.RUNNING);
+
+        Collection<JsonBean> insertList = new ArrayList<JsonBean>();
+        insertList.add(action1);
+        insertList.add(action2);
+
+        List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+        // Add two actions to insert list
+        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, job));
+
+        // set fault injection to true, so transaction is roll backed
+        setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+        setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+        FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+        try {
+            executor.executeBatchInsertUpdateDelete(insertList, updateList, null);
+            fail("Expected exception due to commit failure but didn't get any");
+        }
+        catch (Exception e) {
+        }
+        FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+        // Check whether transactions are rolled back or not
+
+        WorkflowJobBean wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, job.getId());
+        // status should not be RUNNING
+        assertEquals("PREP", wfBean.getStatusStr());
+
+        WorkflowActionBean waBean;
+        try {
+            waBean = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, action1.getId());
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+        try {
+            waBean = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, action2.getId());
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+    }
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java?rev=1520829&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java Sun Sep  8 02:47:39 2013
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobQueryExecutor extends XDataTestCase {
+    Services services;
+    JPAService jpaService;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        jpaService = Services.get().get(JPAService.class);
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testGetQuery() throws Exception {
+        EntityManager em = jpaService.getEntityManager();
+        CoordinatorJobBean cjBean = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+
+        // UPDATE_COORD_JOB
+        Query query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB, cjBean, em);
+        assertEquals(query.getParameterValue("appName"), cjBean.getAppName());
+        assertEquals(query.getParameterValue("appPath"), cjBean.getAppPath());
+        assertEquals(query.getParameterValue("concurrency"), cjBean.getConcurrency());
+        assertEquals(query.getParameterValue("conf"), cjBean.getConf());
+        assertEquals(query.getParameterValue("externalId"), cjBean.getExternalId());
+        assertEquals(query.getParameterValue("frequency"), cjBean.getFrequency());
+        assertEquals(query.getParameterValue("lastActionNumber"), cjBean.getLastActionNumber());
+        assertEquals(query.getParameterValue("timeOut"), cjBean.getTimeout());
+        assertEquals(query.getParameterValue("timeZone"), cjBean.getTimeZone());
+        assertEquals(query.getParameterValue("createdTime"), cjBean.getCreatedTimestamp());
+        assertEquals(query.getParameterValue("endTime"), cjBean.getEndTimestamp());
+        assertEquals(query.getParameterValue("execution"), cjBean.getExecution());
+        assertEquals(query.getParameterValue("jobXml"), cjBean.getJobXml());
+        assertEquals(query.getParameterValue("lastAction"), cjBean.getLastActionTimestamp());
+        assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+        assertEquals(query.getParameterValue("nextMaterializedTime"), cjBean.getNextMaterializedTimestamp());
+        assertEquals(query.getParameterValue("origJobXml"), cjBean.getOrigJobXml());
+        assertEquals(query.getParameterValue("slaXml"), cjBean.getSlaXml());
+        assertEquals(query.getParameterValue("startTime"), cjBean.getStartTimestamp());
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("timeUnit"), cjBean.getTimeUnit().toString());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_STATUS
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS, cjBean, em);
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_BUNDLEID
+        cjBean.setBundleId("bundleID-test");
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_BUNDLEID, cjBean, em);
+        assertEquals(query.getParameterValue("bundleId"), cjBean.getBundleId());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_STATUS_PENDING
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING,
+                cjBean, em);
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_STATUS_MODTIME
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME,
+                cjBean, em);
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_STATUS_PENDING_MODTIME
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(
+                CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, cjBean, em);
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+        assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_LAST_MODIFIED_TIME
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
+                cjBean, em);
+        assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_STATUS_PENDING_TIME
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME,
+                cjBean, em);
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+        assertEquals(query.getParameterValue("doneMaterialization"), cjBean.isDoneMaterialization() ? 1 : 0);
+        assertEquals(query.getParameterValue("suspendedTime"), cjBean.getSuspendedTimestamp());
+        assertEquals(query.getParameterValue("lastModifiedTime"), cjBean.getLastModifiedTimestamp());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_MATERIALIZE
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, cjBean,
+                em);
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+        assertEquals(query.getParameterValue("doneMaterialization"), cjBean.isDoneMaterialization() ? 1 : 0);
+        assertEquals(query.getParameterValue("lastActionTime"), cjBean.getLastActionTimestamp());
+        assertEquals(query.getParameterValue("lastActionNumber"), cjBean.getLastActionNumber());
+        assertEquals(query.getParameterValue("nextMatdTime"), cjBean.getNextMaterializedTimestamp());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+
+        // UPDATE_COORD_JOB_CHANGE
+        query = CoordJobQueryExecutor.getInstance().getUpdateQuery(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, cjBean, em);
+        assertEquals(query.getParameterValue("endTime"), cjBean.getEndTimestamp());
+        assertEquals(query.getParameterValue("status"), cjBean.getStatus().toString());
+        assertEquals(query.getParameterValue("pending"), cjBean.isPending() ? 1 : 0);
+        assertEquals(query.getParameterValue("doneMaterialization"), cjBean.isDoneMaterialization() ? 1 : 0);
+        assertEquals(query.getParameterValue("concurrency"), cjBean.getConcurrency());
+        assertEquals(query.getParameterValue("pauseTime"), cjBean.getPauseTimestamp());
+        assertEquals(query.getParameterValue("lastActionNumber"), cjBean.getLastActionNumber());
+        assertEquals(query.getParameterValue("lastActionTime"), cjBean.getLastActionTimestamp());
+        assertEquals(query.getParameterValue("nextMatdTime"), cjBean.getNextMaterializedTimestamp());
+        assertEquals(query.getParameterValue("id"), cjBean.getId());
+        em.close();
+    }
+
+    public void testExecuteUpdate() throws Exception {
+
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job);
+        CoordinatorJobBean job1 = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
+        assertEquals(job1.getStatus(), CoordinatorJob.Status.RUNNING);
+
+        job1.setStatus(CoordinatorJob.Status.SUCCEEDED);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, job1);
+        CoordinatorJobBean job2 = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job1.getId());
+        assertEquals(job2.getStatus(), CoordinatorJob.Status.SUCCEEDED);
+
+    }
+
+    public void testGet() throws Exception {
+        // TODO
+    }
+
+    public void testGetList() throws Exception {
+        // TODO
+    }
+
+    public void testInsert() throws Exception {
+        CoordinatorJobBean bean = new CoordinatorJobBean();
+        bean.setId("test-oozie");
+        bean.setAppName("testApp");
+        bean.setUser("oozie");
+        CoordJobQueryExecutor.getInstance().insert(bean);
+        CoordinatorJobBean retBean = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, "test-oozie");
+        assertEquals(retBean.getAppName(), "testApp");
+        assertEquals(retBean.getUser(), "oozie");
+    }
+}