You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/09/08 04:29:33 UTC
svn commit: r1520828 [4/5] - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java...
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java Sun Sep 8 02:29:31 2013
@@ -25,8 +25,7 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
@@ -144,7 +143,7 @@ public class TestCoordActionsKillXComman
"coord-action-get.xml", 0);
action2.setNominalTime(DateUtils.parseDateOozieTZ("2009-12-15T02:00Z"));
action2.setExternalId(null);
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action2);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action2));
WorkflowJobBean wf = new WorkflowJobBean();
WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef(
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java Sun Sep 8 02:29:31 2013
@@ -31,11 +31,11 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.Job;
import org.apache.oozie.command.CommandException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.JPAService;
@@ -350,7 +350,7 @@ public class TestCoordChangeXCommand ext
insertList.add(slaSummaryBean3);
JPAService jpaService = Services.get().get(JPAService.class);
- BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call();
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java Sun Sep 8 02:29:31 2013
@@ -32,8 +32,7 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.PartitionDependencyManagerService;
@@ -102,7 +101,7 @@ public class TestCoordKillXCommand exten
// RUNNINGWITHERROR if it had loaded status and had it as RUNNING in memory when CoordKill
// executes and updates status to KILLED in database.
job.setStatus(CoordinatorJob.Status.RUNNINGWITHERROR);
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(job));
job = jpaService.execute(coordJobGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
final CoordMaterializeTransitionXCommand transitionCmd = new CoordMaterializeTransitionXCommand(job.getId(), 3600);
@@ -200,7 +199,7 @@ public class TestCoordKillXCommand exten
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(job));
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java Sun Sep 8 02:29:31 2013
@@ -45,9 +45,8 @@ import org.apache.oozie.coord.CoordELFun
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
@@ -846,7 +845,7 @@ public class TestCoordRerunXCommand exte
final JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -895,7 +894,7 @@ public class TestCoordRerunXCommand exte
final JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.FAILED, "coord-rerun-action1.xml", 0);
@@ -945,7 +944,7 @@ public class TestCoordRerunXCommand exte
final JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.SUCCEEDED, "coord-rerun-action1.xml", 0);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordResumeXCommand.java Sun Sep 8 02:29:31 2013
@@ -20,8 +20,7 @@ package org.apache.oozie.command.coord;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
@@ -128,7 +127,7 @@ public class TestCoordResumeXCommand ext
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
JPAService jpaService = Services.get().get(JPAService.class);
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(job));
assertNotNull(jpaService);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Sun Sep 8 02:29:31 2013
@@ -329,7 +329,7 @@ public class TestActionErrors extends XD
assertEquals("TEST_ERROR", action.getErrorCode());
assertEquals(expErrorMsg, action.getErrorMessage());
assertEquals(expStatus1, action.getStatus());
- assertTrue(action.isPending() == false);
+ assertTrue(action.getPending() == false);
assertTrue(engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUSPENDED);
@@ -410,7 +410,7 @@ public class TestActionErrors extends XD
assertEquals("TEST_ERROR", action.getErrorCode());
assertEquals(expErrorMsg, action.getErrorMessage());
assertEquals(expStatus1, action.getStatus());
- assertFalse(action.isPending());
+ assertFalse(action.getPending());
assertEquals (WorkflowJob.Status.SUSPENDED, job.getStatus());
@@ -509,7 +509,7 @@ public class TestActionErrors extends XD
assertEquals("TEST_ERROR", action.getErrorCode());
assertEquals(expErrorMsg, action.getErrorMessage());
assertEquals(expStatus2, action.getStatus());
- assertTrue(action.isPending() == false);
+ assertTrue(action.getPending() == false);
assertEquals(WorkflowJob.Status.SUSPENDED, engine.getJob(jobId).getStatus());
store2.commitTrx();
store2.closeTrx();
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java Sun Sep 8 02:29:31 2013
@@ -133,7 +133,7 @@ public class TestActionStartXCommand ext
WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
WorkflowActionBean action = super.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
- assertFalse(action.isPending());
+ assertFalse(action.getPending());
assertNull(inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP));
ActionStartXCommand startCmd = new ActionStartXCommand(action.getId(), "map-reduce");
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Sun Sep 8 02:29:31 2013
@@ -64,20 +64,16 @@ import org.apache.oozie.command.wf.Suspe
import org.apache.oozie.command.wf.WorkflowXCommand;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -200,7 +196,7 @@ public class TestEventGeneration extends
LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
wfInstance.start();
job.setWfInstance(wfInstance);
- WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, job);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
new SignalXCommand(job.getId(), wfAction.getId()).call();
job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
@@ -275,9 +271,9 @@ public class TestEventGeneration extends
action = jpaService.execute(coordGetCmd);
WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
- WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
action.setStatus(CoordinatorAction.Status.RUNNING);
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
@@ -294,8 +290,9 @@ public class TestEventGeneration extends
// Action Failure
action.setStatus(CoordinatorAction.Status.RUNNING);
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
- WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ wfJob.setStatus(WorkflowJob.Status.FAILED);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
@@ -311,14 +308,14 @@ public class TestEventGeneration extends
// Action start on Coord Resume
coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coord);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
action.setStatus(CoordinatorAction.Status.SUSPENDED);
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
wfJob.setWorkflowInstance(wfInstance);
- WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
new CoordResumeXCommand(coord.getId()).call();
Thread.sleep(5000);
CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
@@ -331,7 +328,7 @@ public class TestEventGeneration extends
// Action going to WAITING on Coord Rerun
action.setStatus(CoordinatorAction.Status.SUCCEEDED);
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
queue.clear();
new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, "1", false, true)
.call();
@@ -398,7 +395,7 @@ public class TestEventGeneration extends
action.setStatus(WorkflowAction.Status.KILLED);
action.setPendingOnly();
action.setEndTime(null); //its already set by XTestCase add action record method above
- WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_END, action);
+ jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
new ActionKillXCommand(action.getId()).call();
action = jpaService.execute(wfActionGetCmd);
assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
@@ -445,8 +442,7 @@ public class TestEventGeneration extends
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
WorkflowJobBean wjb = new WorkflowJobBean();
wjb.setId(action.getExternalId());
- wjb.setLastModifiedTime(new Date());
- WorkflowJobQueryExecutor.getInstance().insert(wjb);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
@Override
@@ -458,7 +454,7 @@ public class TestEventGeneration extends
// CASE 1: Only pull missing deps
action.setMissingDependencies("pull");
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
myCmd.call();
CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
@@ -467,7 +463,7 @@ public class TestEventGeneration extends
// CASE 2: Only hcat (push) missing deps
action.setMissingDependencies(null);
action.setPushMissingDependencies("push");
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
myCmd.call();
event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
@@ -475,7 +471,7 @@ public class TestEventGeneration extends
// CASE 3: Both types
action.setMissingDependencies("pull");
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
myCmd.call();
event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
@@ -537,7 +533,7 @@ public class TestEventGeneration extends
WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
action.getId());
action.setExternalId(wf.getId());
- CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
String waId = _createWorkflowAction(wf.getId(), "wf-action");
new ActionStartXCommand(waId, action.getType()).call();
@@ -617,7 +613,7 @@ public class TestEventGeneration extends
executionPath, true);
wfAction.setPending();
wfAction.setSignalValue(WorkflowAction.Status.OK.name());
- WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION, wfAction);
+ jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
return workflow;
}
@@ -627,7 +623,7 @@ public class TestEventGeneration extends
writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
String coordXml = coord.getJobXml();
coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord);
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
}
private String _createWorkflowAction(String wfId, String actionName) throws JPAExecutorException {
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateDeleteJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+/**
+ * Testcases for bulk JPA writes - update and delete operations
+ */
+public class TestBulkUpdateDeleteJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ /**
+ * Test bulk updates by updating coordinator job, workflow job and workflow action
+ * @throws Exception
+ */
+ public void testUpdates() throws Exception {
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ // update the status
+ coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+ wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+ action.setStatus(WorkflowAction.Status.RUNNING);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // update the list for doing bulk writes
+ updateList.add(coordJob);
+ updateList.add(wfJob);
+ updateList.add(action);
+ BulkUpdateDeleteJPAExecutor bulkUpdateCmd = new BulkUpdateDeleteJPAExecutor();
+ bulkUpdateCmd.setUpdateList(updateList);
+ jpaService.execute(bulkUpdateCmd);
+
+ // check for expected status after running bulkUpdateJPA
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+ assertEquals("RUNNING", coordJob.getStatusStr());
+
+ wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+ assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+ WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+ assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+ }
+
+ /**
+ * Test bulk deletes by deleting a coord action and a wf action
+ * @throws Exception
+ */
+ public void testDeletes() throws Exception{
+ CoordinatorActionBean action1 = addRecordToCoordActionTable("000-123-C", 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0);
+ WorkflowActionBean action2 = addRecordToWfActionTable("000-123-W", "2", WorkflowAction.Status.PREP);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ List<JsonBean> deleteList = new ArrayList<JsonBean>();
+ // insert one workflow job and two actions
+ deleteList.add(action1);
+ deleteList.add(action2);
+
+ BulkUpdateDeleteJPAExecutor bulkDelRerunCmd = new BulkUpdateDeleteJPAExecutor();
+ bulkDelRerunCmd.setDeleteList(deleteList);
+ jpaService.execute(bulkDelRerunCmd);
+
+ // check for non existence after running bulkDeleteJPA
+ try {
+ jpaService.execute(new CoordActionGetJPAExecutor(action1.getId()));
+ fail(); //should not be found
+ }
+ catch(JPAExecutorException jex) {
+ assertEquals(ErrorCode.E0605, jex.getErrorCode());
+ }
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(action2.getId()));
+ fail(); //should not be found
+ }
+ catch(JPAExecutorException jex) {
+ assertEquals(ErrorCode.E0605, jex.getErrorCode());
+ }
+ }
+
+ /**
+ * Test bulk updates and deletes
+ * workflow job and action
+ *
+ * @throws Exception
+ */
+ public void testBulkUpdatesDeletes() throws Exception{
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
+ WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // Add job to update
+ updateList.add(job);
+
+ List<JsonBean> deleteList = new ArrayList<JsonBean>();
+ //Add action to delete
+ deleteList.add(action);
+
+ BulkUpdateDeleteJPAExecutor bulkDelRerunCmd = new BulkUpdateDeleteJPAExecutor();
+ bulkDelRerunCmd.setUpdateList(updateList);
+ bulkDelRerunCmd.setDeleteList(deleteList);
+ jpaService.execute(bulkDelRerunCmd);
+
+ // check for update after running bulkJPA. job should be updated from KILLED -> RUNING
+ job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
+ assertEquals("RUNNING", job.getStatusStr());
+
+ // check for non existence after running bulkJPA
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+ fail(); //should not be found
+ }
+ catch(JPAExecutorException jex) {
+ assertEquals(ErrorCode.E0605, jex.getErrorCode());
+ }
+ }
+
+ /**
+ * Test bulk updates and deletes rollback
+ *
+ * @throws Exception
+ */
+ public void testBulkUpdatesDeletesRollback() throws Exception{
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = addRecordToWfActionTable(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ List<JsonBean> deleteList = new ArrayList<JsonBean>();
+ // Add two actions to delete list
+ deleteList.add(action1);
+ deleteList.add(action2);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // Add to update list
+ updateList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BulkUpdateDeleteJPAExecutor wfUpdateCmd1 = new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true);
+
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+ try {
+ jpaService.execute(wfUpdateCmd1);
+ fail("Expected exception due to commit failure but didn't get any");
+ }
+ catch (Exception e) {
+ }
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+ // Check whether transactions are rolled back or not
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+ // status should NOT be RUNNING
+ assertEquals("PREP", wfBean.getStatusStr());
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(action1.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("WF action should not be removed due to rollback but was not found");
+ }
+
+ try {
+ jpaService.execute(new WorkflowActionGetJPAExecutor(action2.getId()));
+ }
+ catch (JPAExecutorException je) {
+ fail("WF action should not be removed due to rollback but was not found");
+ }
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStartJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowApp;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+
+/**
+ * Testcases for bulk JPA writes - insert and update operations for Coord Action
+ * Start command
+ */
+public class TestBulkUpdateInsertForCoordActionStartJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ /**
+ * Test bulk updates by updating coordinator job, workflow job and workflow action
+ * @throws Exception
+ */
+ public void testUpdates() throws Exception {
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ // update the status
+ coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+ wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+ action.setStatus(WorkflowAction.Status.RUNNING);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // update the list for doing bulk writes
+ updateList.add(coordJob);
+ updateList.add(wfJob);
+ updateList.add(action);
+ BulkUpdateInsertForCoordActionStartJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStartJPAExecutor();
+ bulkUpdateCmd.setUpdateList(updateList);
+ jpaService.execute(bulkUpdateCmd);
+
+ // check for expected status after running bulkUpdateJPA
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+ assertEquals("RUNNING", coordJob.getStatusStr());
+
+ wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+ assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+ WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+ assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+ }
+
+ /**
+ * Test bulk inserts by inserting a workflow job and two workflow actions
+ * @throws Exception
+ */
+ public void testInserts() throws Exception{
+ WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+ .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+ Configuration conf = new Configuration();
+ Path appUri = new Path(getAppPath(), "workflow.xml");
+ conf.set(OozieClient.APP_PATH, appUri.toString());
+ conf.set(OozieClient.LOG_TOKEN, "testToken");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ WorkflowJobBean job = createWorkflow(app, conf, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // insert one workflow job and two actions
+ insertList.add(action1);
+ insertList.add(action2);
+ insertList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ BulkUpdateInsertForCoordActionStartJPAExecutor bulkInsertCmd = new BulkUpdateInsertForCoordActionStartJPAExecutor();
+ bulkInsertCmd.setInsertList(insertList);
+ jpaService.execute(bulkInsertCmd);
+
+ // check for expected status after running bulkUpdateJPA
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ action1 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action1.getStatusStr());
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ action2 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action2.getStatusStr());
+
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(wfGetCmd);
+ assertEquals("PREP", job.getStatusStr());
+
+ }
+
+ /**
+ * Test bulk inserts and updates by inserting wf actions and updating
+ * coordinator and workflow jobs
+ *
+ * @throws Exception
+ */
+ public void testBulkInsertUpdates() throws Exception{
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ coordJob.setStatus(Job.Status.SUCCEEDED);
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // Add two actions to insert list
+ insertList.add(action1);
+ insertList.add(action2);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ //Add two jobs to update list
+ updateList.add(coordJob);
+ updateList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BulkUpdateInsertForCoordActionStartJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList);
+ jpaService.execute(bulkUpdateCmd);
+
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+ assertEquals("SUCCEEDED", coordJob.getStatusStr());
+
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+ assertEquals("RUNNING", wfBean.getStatusStr());
+
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ action1 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action1.getStatusStr());
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ action2 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action2.getStatusStr());
+ }
+
+ /**
+ * Test bulk inserts and updates rollback
+ *
+ * @throws Exception
+ */
+ public void testBulkInsertUpdatesRollback() throws Exception{
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // Add two actions to insert list
+ insertList.add(action1);
+ insertList.add(action2);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // Add to update list
+ updateList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BulkUpdateInsertForCoordActionStartJPAExecutor wfUpdateCmd1 = new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList);
+
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+ try {
+ jpaService.execute(wfUpdateCmd1);
+ fail("Expected exception due to commit failure but didn't get any");
+ }
+ catch (Exception e) {
+ }
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+ // Check whether transactions are rolled back or not
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+ // status should not be RUNNING
+ assertEquals("PREP", wfBean.getStatusStr());
+
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ try {
+ action1 = jpaService.execute(actionGetCmd);
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ try {
+ action2 = jpaService.execute(actionGetCmd);
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertForCoordActionStatusJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowApp;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+
+/**
+ * Testcases for bulk JPA writes - insert and update operations for Coord Action
+ * Status commands
+ */
+public class TestBulkUpdateInsertForCoordActionStatusJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ /**
+ * Test bulk updates by updating coordinator job, workflow job and workflow action
+ * @throws Exception
+ */
+ public void testUpdates() throws Exception {
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ // update the status
+ coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+ wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+ action.setStatus(WorkflowAction.Status.RUNNING);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // update the list for doing bulk writes
+ updateList.add(coordJob);
+ updateList.add(wfJob);
+ updateList.add(action);
+ BulkUpdateInsertForCoordActionStatusJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStatusJPAExecutor();
+ bulkUpdateCmd.setUpdateList(updateList);
+ jpaService.execute(bulkUpdateCmd);
+
+ // check for expected status after running bulkUpdateJPA
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+ assertEquals("RUNNING", coordJob.getStatusStr());
+
+ wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+ assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+ WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+ assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+ }
+
+ /**
+ * Test bulk inserts by inserting a workflow job and two workflow actions
+ * @throws Exception
+ */
+ public void testInserts() throws Exception{
+ WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+ .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+ Configuration conf = new Configuration();
+ Path appUri = new Path(getAppPath(), "workflow.xml");
+ conf.set(OozieClient.APP_PATH, appUri.toString());
+ conf.set(OozieClient.LOG_TOKEN, "testToken");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ WorkflowJobBean job = createWorkflow(app, conf, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // insert one workflow job and two actions
+ insertList.add(action1);
+ insertList.add(action2);
+ insertList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ BulkUpdateInsertForCoordActionStatusJPAExecutor bulkInsertCmd = new BulkUpdateInsertForCoordActionStatusJPAExecutor();
+ bulkInsertCmd.setInsertList(insertList);
+ jpaService.execute(bulkInsertCmd);
+
+ // check for expected status after running bulkUpdateJPA
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ action1 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action1.getStatusStr());
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ action2 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action2.getStatusStr());
+
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(wfGetCmd);
+ assertEquals("PREP", job.getStatusStr());
+
+ }
+
+ /**
+ * Test bulk inserts and updates by inserting wf actions and updating
+ * coordinator and workflow jobs
+ *
+ * @throws Exception
+ */
+ public void testBulkInsertUpdates() throws Exception{
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ coordJob.setStatus(Job.Status.SUCCEEDED);
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // Add two actions to insert list
+ insertList.add(action1);
+ insertList.add(action2);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ //Add two jobs to update list
+ updateList.add(coordJob);
+ updateList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BulkUpdateInsertForCoordActionStatusJPAExecutor bulkUpdateCmd = new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList);
+ jpaService.execute(bulkUpdateCmd);
+
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+ assertEquals("SUCCEEDED", coordJob.getStatusStr());
+
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+ assertEquals("RUNNING", wfBean.getStatusStr());
+
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ action1 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action1.getStatusStr());
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ action2 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action2.getStatusStr());
+ }
+
+ /**
+ * Test bulk inserts and updates rollback
+ *
+ * @throws Exception
+ */
+ public void testBulkInsertUpdatesRollback() throws Exception{
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // Add two actions to insert list
+ insertList.add(action1);
+ insertList.add(action2);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // Add to update list
+ updateList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BulkUpdateInsertForCoordActionStatusJPAExecutor wfUpdateCmd1 = new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList);
+
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+ try {
+ jpaService.execute(wfUpdateCmd1);
+ fail("Expected exception due to commit failure but didn't get any");
+ }
+ catch (Exception e) {
+ }
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+ // Check whether transactions are rolled back or not
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+ // status should not be RUNNING
+ assertEquals("PREP", wfBean.getStatusStr());
+
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ try {
+ action1 = jpaService.execute(actionGetCmd);
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ try {
+ action2 = jpaService.execute(actionGetCmd);
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowApp;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+
+/**
+ * Testcases for bulk JPA writes - inserts and updates
+ */
+public class TestBulkUpdateInsertJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ /**
+ * Test bulk updates by updating coordinator job, workflow job and workflow action
+ * @throws Exception
+ */
+ public void testUpdates() throws Exception {
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.PREP);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ // update the status
+ coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+ wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+ action.setStatus(WorkflowAction.Status.RUNNING);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // update the list for doing bulk writes
+ updateList.add(coordJob);
+ updateList.add(wfJob);
+ updateList.add(action);
+ BulkUpdateInsertJPAExecutor bulkUpdateCmd = new BulkUpdateInsertJPAExecutor();
+ bulkUpdateCmd.setUpdateList(updateList);
+ jpaService.execute(bulkUpdateCmd);
+
+ // check for expected status after running bulkUpdateJPA
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+ assertEquals("RUNNING", coordJob.getStatusStr());
+
+ wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfJob.getId()));
+ assertEquals("SUCCEEDED", wfJob.getStatusStr());
+
+ WorkflowActionBean action2 = jpaService.execute(new WorkflowActionGetJPAExecutor(action.getId()));
+ assertEquals(WorkflowAction.Status.RUNNING, action2.getStatus());
+
+ }
+
+ /**
+ * Test bulk inserts by inserting a workflow job and two workflow actions
+ * @throws Exception
+ */
+ public void testInserts() throws Exception{
+ WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+ .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+ Configuration conf = new Configuration();
+ Path appUri = new Path(getAppPath(), "workflow.xml");
+ conf.set(OozieClient.APP_PATH, appUri.toString());
+ conf.set(OozieClient.LOG_TOKEN, "testToken");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ WorkflowJobBean job = createWorkflow(app, conf, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // insert one workflow job and two actions
+ insertList.add(action1);
+ insertList.add(action2);
+ insertList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ BulkUpdateInsertJPAExecutor bulkInsertCmd = new BulkUpdateInsertJPAExecutor();
+ bulkInsertCmd.setInsertList(insertList);
+ jpaService.execute(bulkInsertCmd);
+
+ // check for expected status after running bulkUpdateJPA
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ action1 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action1.getStatusStr());
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ action2 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action2.getStatusStr());
+
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(wfGetCmd);
+ assertEquals("PREP", job.getStatusStr());
+
+ }
+
+ /**
+ * Test bulk inserts and updates by inserting wf actions and updating
+ * coordinator and workflow jobs
+ *
+ * @throws Exception
+ */
+ public void testBulkInsertUpdates() throws Exception{
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, true, true);
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ coordJob.setStatus(Job.Status.SUCCEEDED);
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // Add two actions to insert list
+ insertList.add(action1);
+ insertList.add(action2);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ //Add two jobs to update list
+ updateList.add(coordJob);
+ updateList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BulkUpdateInsertJPAExecutor bulkUpdateCmd = new BulkUpdateInsertJPAExecutor(updateList, insertList);
+ jpaService.execute(bulkUpdateCmd);
+
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordJob.getId()));
+ assertEquals("SUCCEEDED", coordJob.getStatusStr());
+
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+ assertEquals("RUNNING", wfBean.getStatusStr());
+
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ action1 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action1.getStatusStr());
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ action2 = jpaService.execute(actionGetCmd);
+ assertEquals("PREP", action2.getStatusStr());
+ }
+
+ /**
+ * Test bulk inserts and updates rollback
+ *
+ * @throws Exception
+ */
+ public void testBulkInsertUpdatesRollback() throws Exception{
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ WorkflowActionBean action1 = createWorkflowAction(job.getId(), "1", WorkflowAction.Status.PREP);
+ WorkflowActionBean action2 = createWorkflowAction(job.getId(), "2", WorkflowAction.Status.PREP);
+
+ job.setStatus(WorkflowJob.Status.RUNNING);
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ // Add two actions to insert list
+ insertList.add(action1);
+ insertList.add(action2);
+
+ List<JsonBean> updateList = new ArrayList<JsonBean>();
+ // Add to update list
+ updateList.add(job);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ BulkUpdateInsertJPAExecutor wfUpdateCmd1 = new BulkUpdateInsertJPAExecutor(updateList, insertList);
+
+ // set fault injection to true, so transaction is roll backed
+ setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+ setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+ try {
+ jpaService.execute(wfUpdateCmd1);
+ fail("Expected exception due to commit failure but didn't get any");
+ }
+ catch (Exception e) {
+ }
+ FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+
+ // Check whether transactions are rolled back or not
+ WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowJobBean wfBean = jpaService.execute(wfGetCmd);
+ // status should not be RUNNING
+ assertEquals("PREP", wfBean.getStatusStr());
+
+ WorkflowActionGetJPAExecutor actionGetCmd = new WorkflowActionGetJPAExecutor(action1.getId());
+ try {
+ action1 = jpaService.execute(actionGetCmd);
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+
+ actionGetCmd = new WorkflowActionGetJPAExecutor(action2.getId());
+ try {
+ action2 = jpaService.execute(actionGetCmd);
+ fail("Expected exception but didnt get any");
+ }
+ catch (JPAExecutorException jpaee) {
+ assertEquals(ErrorCode.E0605, jpaee.getErrorCode());
+ }
+
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForInputCheckJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionUpdateForInputCheckJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCoordActionUpdateStatus() throws Exception {
+ int actionNum = 1;
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+ _testCoordActionUpdateStatus(action);
+ }
+
+ private void _testCoordActionUpdateStatus(CoordinatorActionBean action) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ action.setActionXml("dummyXml");
+ action.setMissingDependencies("dummyDependencies");
+
+ // Call the JPAUpdate executor to execute the Update command
+ CoordActionUpdateForInputCheckJPAExecutor coordUpdCmd = new CoordActionUpdateForInputCheckJPAExecutor(action);
+ jpaService.execute(coordUpdCmd);
+
+ CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+ CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+ assertNotNull(newAction);
+ // Check for expected values
+ assertEquals(CoordinatorAction.Status.SUCCEEDED, newAction.getStatus());
+ assertEquals("dummyXml", newAction.getActionXml());
+ assertEquals("dummyDependencies", newAction.getMissingDependencies());
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Date;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestCoordActionUpdateForModifiedTimeJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testCoordActionUpdateModifiedTime() throws Exception {
+ int actionNum = 1;
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+ _testCoordActionUpdateModifiedTime(action);
+ }
+
+ private void _testCoordActionUpdateModifiedTime(CoordinatorActionBean action) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ Date currentDate = new Date();
+ assertTrue(currentDate.getTime() - action.getLastModifiedTime().getTime() > 0);
+ // Call the JPAUpdate executor to execute the Update command
+ CoordActionUpdateForModifiedTimeJPAExecutor coordUpdCmd = new CoordActionUpdateForModifiedTimeJPAExecutor(
+ action);
+ jpaService.execute(coordUpdCmd);
+
+ CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+ CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+ assertNotNull(newAction);
+ assertTrue(newAction.getLastModifiedTime().getTime() - currentDate.getTime() > 0);
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionUpdateJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCoordActionUpdate() throws Exception {
+ int actionNum = 1;
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+ _testCoordActionUpdate(action);
+ }
+
+ private void _testCoordActionUpdate(CoordinatorActionBean action) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ CoordActionUpdateJPAExecutor coordUpdCmd = new CoordActionUpdateJPAExecutor(action);
+ jpaService.execute(coordUpdCmd);
+
+ CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+ CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+ assertNotNull(newAction);
+ assertEquals(newAction.getStatus(), CoordinatorAction.Status.SUCCEEDED);
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateStatusJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionUpdateStatusJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCoordActionUpdateStatus() throws Exception {
+ int actionNum = 1;
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+ _testCoordActionUpdateStatus(action);
+ }
+
+ private void _testCoordActionUpdateStatus(CoordinatorActionBean action) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ // Update the status of action to "SUCCEEDED" from "RUNNING"
+ action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ // Update pending to 1 from 0
+ action.setPending(1);
+
+ // Call the JPAUpdate executor to execute the Update command
+ CoordActionUpdateStatusJPAExecutor coordUpdCmd = new CoordActionUpdateStatusJPAExecutor(action);
+ jpaService.execute(coordUpdCmd);
+
+ CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(action.getId());
+ CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+ assertNotNull(newAction);
+ // Check for expected values
+ assertEquals(newAction.getStatus(), CoordinatorAction.Status.SUCCEEDED);
+ assertEquals(newAction.getPending(), 1);
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java?rev=1520828&r1=1520827&r2=1520828&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobUpdateJPAExecutor.java Sun Sep 8 02:29:31 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordJobUpdateJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCoordJobUpdate() throws Exception {
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ _testUpdateJob(job.getId());
+ }
+
+ private void _testUpdateJob(String jobId) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ CoordJobGetJPAExecutor jobGetCmd = new CoordJobGetJPAExecutor(jobId);
+ CoordinatorJobBean job1 = jpaService.execute(jobGetCmd);
+ job1.setStatus(CoordinatorJob.Status.SUCCEEDED);
+ CoordJobUpdateJPAExecutor coordJobUpdateCommand = new CoordJobUpdateJPAExecutor(job1);
+ jpaService.execute(coordJobUpdateCommand);
+
+ CoordinatorJobBean job2 = jpaService.execute(jobGetCmd);
+ assertEquals(job2.getStatus(), CoordinatorJob.Status.SUCCEEDED);
+ }
+
+}