You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/09/08 04:29:33 UTC

svn commit: r1520828 [4/5] - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java Sun Sep  8 02:29:31 2013
@@ -25,8 +25,7 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
@@ -144,7 +143,7 @@ public class TestCoordActionsKillXComman
                 "coord-action-get.xml", 0);
         action2.setNominalTime(DateUtils.parseDateOozieTZ("2009-12-15T02:00Z"));
         action2.setExternalId(null);
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action2);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action2));
 
         WorkflowJobBean wf = new WorkflowJobBean();
         WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef(

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java Sun Sep  8 02:29:31 2013
@@ -31,11 +31,11 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.JPAService;
@@ -350,7 +350,7 @@ public class TestCoordChangeXCommand ext
         insertList.add(slaSummaryBean3);
 
         JPAService jpaService = Services.get().get(JPAService.class);
-        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+        jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
 
         new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call();
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java Sun Sep  8 02:29:31 2013
@@ -32,8 +32,7 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.PartitionDependencyManagerService;
@@ -102,7 +101,7 @@ public class TestCoordKillXCommand exten
         // RUNNINGWITHERROR if it had loaded status and had it as RUNNING in memory when CoordKill
         // executes and updates status to KILLED in database.
         job.setStatus(CoordinatorJob.Status.RUNNINGWITHERROR);
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(job));
         job = jpaService.execute(coordJobGetCmd);
         assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
         final CoordMaterializeTransitionXCommand transitionCmd = new CoordMaterializeTransitionXCommand(job.getId(), 3600);
@@ -200,7 +199,7 @@ public class TestCoordKillXCommand exten
         CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
 
         job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(job));
 
         CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
         CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java Sun Sep  8 02:29:31 2013
@@ -45,9 +45,8 @@ import org.apache.oozie.coord.CoordELFun
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchemaService;
@@ -846,7 +845,7 @@ public class TestCoordRerunXCommand exte
         final JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
         coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
 
         CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
                 CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -895,7 +894,7 @@ public class TestCoordRerunXCommand exte
         final JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
         coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
 
         CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
                 CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -945,7 +944,7 @@ public class TestCoordRerunXCommand exte
         final JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
         coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
 
         CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
                 CoordinatorAction.Status.SUCCEEDED, "coord-rerun-action1.xml", 0);

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java Sun Sep  8 02:29:31 2013
@@ -20,8 +20,7 @@ package org.apache.oozie.command.coord;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
@@ -128,7 +127,7 @@ public class TestCoordResumeXCommand ext
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
         job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
         JPAService jpaService = Services.get().get(JPAService.class);
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(job));
 
         assertNotNull(jpaService);
         CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Sun Sep  8 02:29:31 2013
@@ -329,7 +329,7 @@ public class TestActionErrors extends XD
         assertEquals("TEST_ERROR", action.getErrorCode());
         assertEquals(expErrorMsg, action.getErrorMessage());
         assertEquals(expStatus1, action.getStatus());
-        assertTrue(action.isPending() == false);
+        assertTrue(action.getPending() == false);
 
         assertTrue(engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUSPENDED);
 
@@ -410,7 +410,7 @@ public class TestActionErrors extends XD
         assertEquals("TEST_ERROR", action.getErrorCode());
         assertEquals(expErrorMsg, action.getErrorMessage());
         assertEquals(expStatus1, action.getStatus());
-        assertFalse(action.isPending());
+        assertFalse(action.getPending());
 
         assertEquals (WorkflowJob.Status.SUSPENDED, job.getStatus());
 
@@ -509,7 +509,7 @@ public class TestActionErrors extends XD
         assertEquals("TEST_ERROR", action.getErrorCode());
         assertEquals(expErrorMsg, action.getErrorMessage());
         assertEquals(expStatus2, action.getStatus());
-        assertTrue(action.isPending() == false);
+        assertTrue(action.getPending() == false);
         assertEquals(WorkflowJob.Status.SUSPENDED, engine.getJob(jobId).getStatus());
         store2.commitTrx();
         store2.closeTrx();

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java Sun Sep  8 02:29:31 2013
@@ -133,7 +133,7 @@ public class TestActionStartXCommand ext
 
         WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
         WorkflowActionBean action = super.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
-        assertFalse(action.isPending());
+        assertFalse(action.getPending());
 
         assertNull(inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP));
         ActionStartXCommand startCmd = new ActionStartXCommand(action.getId(), "map-reduce");

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Sun Sep  8 02:29:31 2013
@@ -64,20 +64,16 @@ import org.apache.oozie.command.wf.Suspe
 import org.apache.oozie.command.wf.WorkflowXCommand;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -200,7 +196,7 @@ public class TestEventGeneration extends
         LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
         wfInstance.start();
         job.setWfInstance(wfInstance);
-        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, job);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
         WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
         new SignalXCommand(job.getId(), wfAction.getId()).call();
         job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
@@ -275,9 +271,9 @@ public class TestEventGeneration extends
         action = jpaService.execute(coordGetCmd);
         WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
         wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
-        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
         action.setStatus(CoordinatorAction.Status.RUNNING);
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
         new CoordActionCheckXCommand(action.getId(), 0).call();
         action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
@@ -294,8 +290,9 @@ public class TestEventGeneration extends
 
         // Action Failure
         action.setStatus(CoordinatorAction.Status.RUNNING);
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
-        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        wfJob.setStatus(WorkflowJob.Status.FAILED);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
         new CoordActionCheckXCommand(action.getId(), 0).call();
         action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
@@ -311,14 +308,14 @@ public class TestEventGeneration extends
 
         // Action start on Coord Resume
         coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coord);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
         action.setStatus(CoordinatorAction.Status.SUSPENDED);
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
         wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
         WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
         ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
         wfJob.setWorkflowInstance(wfInstance);
-        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
         new CoordResumeXCommand(coord.getId()).call();
         Thread.sleep(5000);
         CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
@@ -331,7 +328,7 @@ public class TestEventGeneration extends
 
         // Action going to WAITING on Coord Rerun
         action.setStatus(CoordinatorAction.Status.SUCCEEDED);
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
         queue.clear();
         new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, "1", false, true)
                 .call();
@@ -398,7 +395,7 @@ public class TestEventGeneration extends
         action.setStatus(WorkflowAction.Status.KILLED);
         action.setPendingOnly();
         action.setEndTime(null); //its already set by XTestCase add action record method above
-        WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_END, action);
+        jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
         new ActionKillXCommand(action.getId()).call();
         action = jpaService.execute(wfActionGetCmd);
         assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
@@ -445,8 +442,7 @@ public class TestEventGeneration extends
                 CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
         WorkflowJobBean wjb = new WorkflowJobBean();
         wjb.setId(action.getExternalId());
-        wjb.setLastModifiedTime(new Date());
-        WorkflowJobQueryExecutor.getInstance().insert(wjb);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
 
         CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
             @Override
@@ -458,7 +454,7 @@ public class TestEventGeneration extends
 
         // CASE 1: Only pull missing deps
         action.setMissingDependencies("pull");
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
         myCmd.call();
         CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
         assertNotNull(event);
@@ -467,7 +463,7 @@ public class TestEventGeneration extends
         // CASE 2: Only hcat (push) missing deps
         action.setMissingDependencies(null);
         action.setPushMissingDependencies("push");
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
         myCmd.call();
         event = (CoordinatorActionEvent) queue.poll();
         assertNotNull(event);
@@ -475,7 +471,7 @@ public class TestEventGeneration extends
 
         // CASE 3: Both types
         action.setMissingDependencies("pull");
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
         myCmd.call();
         event = (CoordinatorActionEvent) queue.poll();
         assertNotNull(event);
@@ -537,7 +533,7 @@ public class TestEventGeneration extends
         WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
                 action.getId());
         action.setExternalId(wf.getId());
-        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
 
         String waId = _createWorkflowAction(wf.getId(), "wf-action");
         new ActionStartXCommand(waId, action.getType()).call();
@@ -617,7 +613,7 @@ public class TestEventGeneration extends
                 executionPath, true);
         wfAction.setPending();
         wfAction.setSignalValue(WorkflowAction.Status.OK.name());
-        WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION, wfAction);
+        jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
 
         return workflow;
     }
@@ -627,7 +623,7 @@ public class TestEventGeneration extends
         writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
         String coordXml = coord.getJobXml();
         coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
-        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord);
+        jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
     }
 
     private String _createWorkflowAction(String wfId, String actionName) throws JPAExecutorException {

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+/**
+ * Testcases for bulk JPA writes - update and delete operations
+ */
+public class TestBulkUpdateDeleteJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    /**
+     * Test bulk updates by updating coordinator job, workflow job and workflow action
+     * @throws Exception
+     */
+    public void testUpdates() throws Exception {
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        // update the status
+        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+        action.setStatus(WorkflowAction.Status.RUNNING);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // update the list for doing bulk writes
+        updateList.add(coordJob);
+        updateList.add(wfJob);
+        updateList.add(action);
+        BulkUpdateDeleteJPAExecutor bulkUpdateCmd = new BulkUpdateDeleteJPAExecutor();
+        bulkUpdateCmd.setUpdateList(updateList);
+        jpaService.execute(bulkUpdateCmd);
+
+        // check for expected status after running bulkUpdateJPA
+        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+        assertEquals("RUNNING", coordJob.getStatusStr());
+
+        wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+        assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+        WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+        assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+    }
+
+    /**
+     * Test bulk deletes by deleting a coord action and a wf action
+     * @throws Exception
+     */
+    public void testDeletes() throws Exception{
+        CoordinatorActionBean action1 = addRecordToCoordActionTable("000-123-C", 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0);
+        WorkflowActionBean action2 = addRecordToWfActionTable("000-123-W", "2", WorkflowAction.Status.PREP);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        List<JsonBean> deleteList = new ArrayList<JsonBean>();
+        // insert one workflow job and two actions
+        deleteList.add(action1);
+        deleteList.add(action2);
+
+        BulkUpdateDeleteJPAExecutor bulkDelRerunCmd = new BulkUpdateDeleteJPAExecutor();
+        bulkDelRerunCmd.setDeleteList(deleteList);
+        jpaService.execute(bulkDelRerunCmd);
+
+        // check for non existence after running bulkDeleteJPA
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(action1.getId()));
+            fail(); //should not be found
+        }
+        catch(JPAExecutorException jex) {
+            assertEquals(ErrorCode.E0605, jex.getErrorCode());
+        }
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(action2.getId()));
+            fail(); //should not be found
+        }
+        catch(JPAExecutorException jex) {
+            assertEquals(ErrorCode.E0605, jex.getErrorCode());
+        }
+    }
+
+    /**
+     * Test bulk updates and deletes
+     * workflow job and action
+     *
+     * @throws Exception
+     */
+    public void testBulkUpdatesDeletes() throws Exception{
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
+        WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // Add job to update
+        updateList.add(job);
+
+        List<JsonBean> deleteList = new ArrayList<JsonBean>();
+        //Add action to delete
+        deleteList.add(action);
+
+        BulkUpdateDeleteJPAExecutor bulkDelRerunCmd = new BulkUpdateDeleteJPAExecutor();
+        bulkDelRerunCmd.setUpdateList(updateList);
+        bulkDelRerunCmd.setDeleteList(deleteList);
+        jpaService.execute(bulkDelRerunCmd);
+
+        // check for update after running bulkJPA. job should be updated from KILLED -> RUNING
+        job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
+        assertEquals("RUNNING", job.getStatusStr());
+
+        // check for non existence after running bulkJPA
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+            fail(); //should not be found
+        }
+        catch(JPAExecutorException jex) {
+            assertEquals(ErrorCode.E0605, jex.getErrorCode());
+        }
+    }
+
+    /**
+     * Test bulk updates and deletes rollback
+     *
+     * @throws Exception
+     */
+    public void testBulkUpdatesDeletesRollback() throws Exception{
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = addRecordToWfActionTable(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        List<JsonBean> deleteList = new ArrayList<JsonBean>();
+        // Add two actions to delete list
+        deleteList.add(action1);
+        deleteList.add(action2);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // Add to update list
+        updateList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BulkUpdateDeleteJPAExecutor wfUpdateCmd1 = new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true);
+
+        // set fault injection to true, so transaction is roll backed
+        setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+        setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+        try {
+            jpaService.execute(wfUpdateCmd1);
+            fail("Expected exception due to commit failure but didn't get any");
+        }
+        catch (Exception e) {
+        }
+        FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+        // Check whether transactions are rolled back or not
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+        // status should NOT be RUNNING
+        assertEquals("PREP", wfBean.getStatusStr());
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(action1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("WF action should not be removed due to rollback but was not found");
+        }
+
+        try {
+            jpaService.execute(new WorkflowActionGetJPAExecutor(action2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("WF action should not be removed due to rollback but was not found");
+        }
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowApp;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+
+/**
+ * Testcases for bulk JPA writes - insert and update operations for Coord Action
+ * Start command
+ */
+public class TestBulkUpdateInsertForCoordActionStartJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    /**
+     * Test bulk updates by updating coordinator job, workflow job and workflow action
+     * @throws Exception
+     */
+    public void testUpdates() throws Exception {
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        // update the status
+        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+        action.setStatus(WorkflowAction.Status.RUNNING);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // update the list for doing bulk writes
+        updateList.add(coordJob);
+        updateList.add(wfJob);
+        updateList.add(action);
+        BulkUpdateInsertForCoordActionStartJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStartJPAExecutor();
+        bulkUpdateCmd.setUpdateList(updateList);
+        jpaService.execute(bulkUpdateCmd);
+
+        // check for expected status after running bulkUpdateJPA
+        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+        assertEquals("RUNNING", coordJob.getStatusStr());
+
+        wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+        assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+        WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+        assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+    }
+
+    /**
+     * Test bulk inserts by inserting a workflow job and two workflow actions
+     * @throws Exception
+     */
+    public void testInserts() throws Exception{
+        WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+                .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+        Configuration conf = new Configuration();
+        Path appUri = new Path(getAppPath(), "workflow.xml");
+        conf.set(OozieClient.APP_PATH, appUri.toString());
+        conf.set(OozieClient.LOG_TOKEN, "testToken");
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        WorkflowJobBean job = createWorkflow(app, conf, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // insert one workflow job and two actions
+        insertList.add(action1);
+        insertList.add(action2);
+        insertList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BulkUpdateInsertForCoordActionStartJPAExecutor bulkInsertCmd = new BulkUpdateInsertForCoordActionStartJPAExecutor();
+        bulkInsertCmd.setInsertList(insertList);
+        jpaService.execute(bulkInsertCmd);
+
+        // check for expected status after running bulkUpdateJPA
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        action1 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action1.getStatusStr());
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        action2 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action2.getStatusStr());
+
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(wfGetCmd);
+        assertEquals("PREP", job.getStatusStr());
+
+    }
+
+    /**
+     * Test bulk inserts and updates by inserting wf actions and updating
+     * coordinator and workflow jobs
+     *
+     * @throws Exception
+     */
+    public void testBulkInsertUpdates() throws Exception{
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        coordJob.setStatus(Job.Status.SUCCEEDED);
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // Add two actions to insert list
+        insertList.add(action1);
+        insertList.add(action2);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        //Add two jobs to update list
+        updateList.add(coordJob);
+        updateList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BulkUpdateInsertForCoordActionStartJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList);
+        jpaService.execute(bulkUpdateCmd);
+
+        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+        assertEquals("SUCCEEDED", coordJob.getStatusStr());
+
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+        assertEquals("RUNNING", wfBean.getStatusStr());
+
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        action1 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action1.getStatusStr());
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        action2 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action2.getStatusStr());
+    }
+
+    /**
+     * Test bulk inserts and updates rollback
+     *
+     * @throws Exception
+     */
+    public void testBulkInsertUpdatesRollback() throws Exception{
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // Add two actions to insert list
+        insertList.add(action1);
+        insertList.add(action2);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // Add to update list
+        updateList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BulkUpdateInsertForCoordActionStartJPAExecutor wfUpdateCmd1 = new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList);
+
+        // set fault injection to true, so transaction is roll backed
+        setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+        setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+        try {
+            jpaService.execute(wfUpdateCmd1);
+            fail("Expected exception due to commit failure but didn't get any");
+        }
+        catch (Exception e) {
+        }
+        FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+        // Check whether transactions are rolled back or not
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+        // status should not be RUNNING
+        assertEquals("PREP", wfBean.getStatusStr());
+
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        try {
+            action1 = jpaService.execute(actionGetCmd);
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        try {
+            action2 = jpaService.execute(actionGetCmd);
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowApp;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+
+/**
+ * Testcases for bulk JPA writes - insert and update operations for Coord Action
+ * Status commands
+ */
+public class TestBulkUpdateInsertForCoordActionStatusJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    /**
+     * Test bulk updates by updating coordinator job, workflow job and workflow action
+     * @throws Exception
+     */
+    public void testUpdates() throws Exception {
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        // update the status
+        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+        action.setStatus(WorkflowAction.Status.RUNNING);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // update the list for doing bulk writes
+        updateList.add(coordJob);
+        updateList.add(wfJob);
+        updateList.add(action);
+        BulkUpdateInsertForCoordActionStatusJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStatusJPAExecutor();
+        bulkUpdateCmd.setUpdateList(updateList);
+        jpaService.execute(bulkUpdateCmd);
+
+        // check for expected status after running bulkUpdateJPA
+        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+        assertEquals("RUNNING", coordJob.getStatusStr());
+
+        wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+        assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+        WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+        assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+    }
+
+    /**
+     * Test bulk inserts by inserting a workflow job and two workflow actions
+     * @throws Exception
+     */
+    public void testInserts() throws Exception{
+        WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+                .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+        Configuration conf = new Configuration();
+        Path appUri = new Path(getAppPath(), "workflow.xml");
+        conf.set(OozieClient.APP_PATH, appUri.toString());
+        conf.set(OozieClient.LOG_TOKEN, "testToken");
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        WorkflowJobBean job = createWorkflow(app, conf, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // insert one workflow job and two actions
+        insertList.add(action1);
+        insertList.add(action2);
+        insertList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BulkUpdateInsertForCoordActionStatusJPAExecutor bulkInsertCmd = new BulkUpdateInsertForCoordActionStatusJPAExecutor();
+        bulkInsertCmd.setInsertList(insertList);
+        jpaService.execute(bulkInsertCmd);
+
+        // check for expected status after running bulkUpdateJPA
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        action1 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action1.getStatusStr());
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        action2 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action2.getStatusStr());
+
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(wfGetCmd);
+        assertEquals("PREP", job.getStatusStr());
+
+    }
+
+    /**
+     * Test bulk inserts and updates by inserting wf actions and updating
+     * coordinator and workflow jobs
+     *
+     * @throws Exception
+     */
+    public void testBulkInsertUpdates() throws Exception{
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        coordJob.setStatus(Job.Status.SUCCEEDED);
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // Add two actions to insert list
+        insertList.add(action1);
+        insertList.add(action2);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        //Add two jobs to update list
+        updateList.add(coordJob);
+        updateList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BulkUpdateInsertForCoordActionStatusJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList);
+        jpaService.execute(bulkUpdateCmd);
+
+        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+        assertEquals("SUCCEEDED", coordJob.getStatusStr());
+
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+        assertEquals("RUNNING", wfBean.getStatusStr());
+
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        action1 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action1.getStatusStr());
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        action2 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action2.getStatusStr());
+    }
+
+    /**
+     * Test bulk inserts and updates rollback
+     *
+     * @throws Exception
+     */
+    public void testBulkInsertUpdatesRollback() throws Exception{
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // Add two actions to insert list
+        insertList.add(action1);
+        insertList.add(action2);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // Add to update list
+        updateList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BulkUpdateInsertForCoordActionStatusJPAExecutor wfUpdateCmd1 = new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList);
+
+        // set fault injection to true, so transaction is roll backed
+        setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+        setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+        try {
+            jpaService.execute(wfUpdateCmd1);
+            fail("Expected exception due to commit failure but didn't get any");
+        }
+        catch (Exception e) {
+        }
+        FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+        // Check whether transactions are rolled back or not
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+        // status should not be RUNNING
+        assertEquals("PREP", wfBean.getStatusStr());
+
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        try {
+            action1 = jpaService.execute(actionGetCmd);
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        try {
+            action2 = jpaService.execute(actionGetCmd);
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowApp;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+
+/**
+ * Testcases for bulk JPA writes - inserts and updates
+ */
+public class TestBulkUpdateInsertJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    /**
+     * Test bulk updates by updating coordinator job, workflow job and workflow action
+     * @throws Exception
+     */
+    public void testUpdates() throws Exception {
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        // update the status
+        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+        action.setStatus(WorkflowAction.Status.RUNNING);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // update the list for doing bulk writes
+        updateList.add(coordJob);
+        updateList.add(wfJob);
+        updateList.add(action);
+        BulkUpdateInsertJPAExecutor bulkUpdateCmd = new BulkUpdateInsertJPAExecutor();
+        bulkUpdateCmd.setUpdateList(updateList);
+        jpaService.execute(bulkUpdateCmd);
+
+        // check for expected status after running bulkUpdateJPA
+        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+        assertEquals("RUNNING", coordJob.getStatusStr());
+
+        wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+        assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+        WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+        assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+    }
+
+    /**
+     * Test bulk inserts by inserting a workflow job and two workflow actions
+     * @throws Exception
+     */
+    public void testInserts() throws Exception{
+        WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+                .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+        Configuration conf = new Configuration();
+        Path appUri = new Path(getAppPath(), "workflow.xml");
+        conf.set(OozieClient.APP_PATH, appUri.toString());
+        conf.set(OozieClient.LOG_TOKEN, "testToken");
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        WorkflowJobBean job = createWorkflow(app, conf, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // insert one workflow job and two actions
+        insertList.add(action1);
+        insertList.add(action2);
+        insertList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BulkUpdateInsertJPAExecutor bulkInsertCmd = new BulkUpdateInsertJPAExecutor();
+        bulkInsertCmd.setInsertList(insertList);
+        jpaService.execute(bulkInsertCmd);
+
+        // check for expected status after running bulkUpdateJPA
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        action1 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action1.getStatusStr());
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        action2 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action2.getStatusStr());
+
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(wfGetCmd);
+        assertEquals("PREP", job.getStatusStr());
+
+    }
+
+    /**
+     * Test bulk inserts and updates by inserting wf actions and updating
+     * coordinator and workflow jobs
+     *
+     * @throws Exception
+     */
+    public void testBulkInsertUpdates() throws Exception{
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        coordJob.setStatus(Job.Status.SUCCEEDED);
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // Add two actions to insert list
+        insertList.add(action1);
+        insertList.add(action2);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        //Add two jobs to update list
+        updateList.add(coordJob);
+        updateList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BulkUpdateInsertJPAExecutor bulkUpdateCmd = new BulkUpdateInsertJPAExecutor(updateList, insertList);
+        jpaService.execute(bulkUpdateCmd);
+
+        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+        assertEquals("SUCCEEDED", coordJob.getStatusStr());
+
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+        assertEquals("RUNNING", wfBean.getStatusStr());
+
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        action1 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action1.getStatusStr());
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        action2 = jpaService.execute(actionGetCmd);
+        assertEquals("PREP", action2.getStatusStr());
+    }
+
+    /**
+     * Test bulk inserts and updates rollback
+     *
+     * @throws Exception
+     */
+    public void testBulkInsertUpdatesRollback() throws Exception{
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+        WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+        job.setStatus(WorkflowJob.Status.RUNNING);
+        List<JsonBean> insertList = new ArrayList<JsonBean>();
+        // Add two actions to insert list
+        insertList.add(action1);
+        insertList.add(action2);
+
+        List<JsonBean> updateList = new ArrayList<JsonBean>();
+        // Add to update list
+        updateList.add(job);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        BulkUpdateInsertJPAExecutor wfUpdateCmd1 = new BulkUpdateInsertJPAExecutor(updateList, insertList);
+
+        // set fault injection to true, so transaction is roll backed
+        setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+        setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+        try {
+            jpaService.execute(wfUpdateCmd1);
+            fail("Expected exception due to commit failure but didn't get any");
+        }
+        catch (Exception e) {
+        }
+        FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+        // Check whether transactions are rolled back or not
+        WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+        // status should not be RUNNING
+        assertEquals("PREP", wfBean.getStatusStr());
+
+        WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+        try {
+            action1 = jpaService.execute(actionGetCmd);
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+
+        actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+        try {
+            action2 = jpaService.execute(actionGetCmd);
+            fail("Expected exception but didnt get any");
+        }
+        catch (JPAExecutorException jpaee) {
+            assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+        }
+
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionUpdateForInputCheckJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCoordActionUpdateStatus() throws Exception {
+        int actionNum = 1;
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+        _testCoordActionUpdateStatus(action);
+    }
+
+    private void _testCoordActionUpdateStatus(CoordinatorActionBean action) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+        action.setActionXml("dummyXml");
+        action.setMissingDependencies("dummyDependencies");
+
+        // Call the JPAUpdate executor to execute the Update command
+        CoordActionUpdateForInputCheckJPAExecutor coordUpdCmd = new CoordActionUpdateForInputCheckJPAExecutor(action);
+        jpaService.execute(coordUpdCmd);
+
+        CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+        CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+        assertNotNull(newAction);
+        // Check for expected values
+        assertEquals(CoordinatorAction.Status.SUCCEEDED, newAction.getStatus());
+        assertEquals("dummyXml", newAction.getActionXml());
+        assertEquals("dummyDependencies", newAction.getMissingDependencies());
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Date;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestCoordActionUpdateForModifiedTimeJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    @Test
+    public void testCoordActionUpdateModifiedTime() throws Exception {
+        int actionNum = 1;
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+        _testCoordActionUpdateModifiedTime(action);
+    }
+
+    private void _testCoordActionUpdateModifiedTime(CoordinatorActionBean action) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        Date currentDate = new Date();
+        assertTrue(currentDate.getTime() - action.getLastModifiedTime().getTime() > 0);
+        // Call the JPAUpdate executor to execute the Update command
+        CoordActionUpdateForModifiedTimeJPAExecutor coordUpdCmd = new CoordActionUpdateForModifiedTimeJPAExecutor(
+                action);
+        jpaService.execute(coordUpdCmd);
+
+        CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+        CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+        assertNotNull(newAction);
+        assertTrue(newAction.getLastModifiedTime().getTime() - currentDate.getTime() > 0);
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionUpdateJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCoordActionUpdate() throws Exception {
+        int actionNum = 1;
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+        _testCoordActionUpdate(action);
+    }
+
+    private void _testCoordActionUpdate(CoordinatorActionBean action) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+        CoordActionUpdateJPAExecutor coordUpdCmd = new CoordActionUpdateJPAExecutor(action);
+        jpaService.execute(coordUpdCmd);
+
+        CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+        CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+        assertNotNull(newAction);
+        assertEquals(newAction.getStatus(), CoordinatorAction.Status.SUCCEEDED);
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionUpdateStatusJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCoordActionUpdateStatus() throws Exception {
+        int actionNum = 1;
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+        _testCoordActionUpdateStatus(action);
+    }
+
+    private void _testCoordActionUpdateStatus(CoordinatorActionBean action) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        // Update the status of action to "SUCCEEDED" from "RUNNING"
+        action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+        // Update pending to 1 from 0
+        action.setPending(1);
+
+        // Call the JPAUpdate executor to execute the Update command
+        CoordActionUpdateStatusJPAExecutor coordUpdCmd = new CoordActionUpdateStatusJPAExecutor(action);
+        jpaService.execute(coordUpdCmd);
+
+        CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+        CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+        assertNotNull(newAction);
+        // Check for expected values
+        assertEquals(newAction.getStatus(), CoordinatorAction.Status.SUCCEEDED);
+        assertEquals(newAction.getPending(), 1);
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java Sun Sep  8 02:29:31 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobUpdateJPAExecutor extends XDataTestCase {
+    Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCoordJobUpdate() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        _testUpdateJob(job.getId());
+    }
+
+    private void _testUpdateJob(String jobId) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        CoordJobGetJPAExecutor jobGetCmd = new CoordJobGetJPAExecutor(jobId);
+        CoordinatorJobBean job1 = jpaService.execute(jobGetCmd);
+        job1.setStatus(CoordinatorJob.Status.SUCCEEDED);
+        CoordJobUpdateJPAExecutor coordJobUpdateCommand = new CoordJobUpdateJPAExecutor(job1);
+        jpaService.execute(coordJobUpdateCommand);
+
+        CoordinatorJobBean job2 = jpaService.execute(jobGetCmd);
+        assertEquals(job2.getStatus(), CoordinatorJob.Status.SUCCEEDED);
+    }
+
+}