You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by bz...@apache.org on 2015/05/01 22:45:22 UTC
[1/2] oozie git commit: OOZIE-2163 Remove CoordinatorStore (seoeun25
via bzhang)
Repository: oozie
Updated Branches:
refs/heads/master 60f3d18f5 -> 6ea1ed833
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
index 53c5122..c199595 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
@@ -19,7 +19,6 @@
package org.apache.oozie.command.coord;
import java.io.ByteArrayInputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
@@ -48,11 +47,9 @@ import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.dependency.URIHandler;
-import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -60,9 +57,7 @@ import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.StatusTransitService;
-import org.apache.oozie.service.StoreService;
import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
@@ -100,39 +95,26 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
- CoordinatorStore store1 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store1.beginTrx();
- CoordinatorActionBean action1 = store1.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action1 = getCoordinatorAction(actionId);
assertEquals(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store1.commitTrx();
- store1.closeTrx();
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, Integer.toString(actionNum), false, true);
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
+
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
assertNull(action2.getExternalId());
- store2.commitTrx();
- store2.closeTrx();
}
/**
@@ -146,37 +128,27 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final int actionNum2 = 2;
final String actionId1 = jobId + "@" + actionNum1;
final String actionId2 = jobId + "@" + actionNum2;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum1, actionId1, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum1, actionId1, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- addRecordToActionTable(jobId, actionNum2, actionId2, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToActionTable(jobId, actionNum2, actionId2, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action2.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String rerunScope = Integer.toString(actionNum1) + "-" + Integer.toString(actionNum2);
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, rerunScope, false, true);
- CoordinatorStore store1 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store1.beginTrx();
- CoordinatorActionBean action1 = store1.getCoordinatorAction(actionId1, false);
+ CoordinatorActionBean action1 = getCoordinatorAction(actionId1);
assertNotSame(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- CoordinatorActionBean action2 = store1.getCoordinatorAction(actionId2, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId2);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store1.commitTrx();
- store1.closeTrx();
}
/**
@@ -190,36 +162,26 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final int actionNum2 = 2;
final String actionId1 = jobId + "@" + actionNum1;
final String actionId2 = jobId + "@" + actionNum2;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum1, actionId1, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum1, actionId1, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- addRecordToActionTable(jobId, actionNum2, actionId2, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToActionTable(jobId, actionNum2, actionId2, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action2.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String rerunScope = Integer.toString(actionNum1) + "," + Integer.toString(actionNum2);
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, rerunScope, false, true);
- CoordinatorStore store1 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store1.beginTrx();
- CoordinatorActionBean action1 = store1.getCoordinatorAction(actionId1, false);
+ CoordinatorActionBean action1 = getCoordinatorAction(actionId1);
assertNotSame(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- CoordinatorActionBean action2 = store1.getCoordinatorAction(actionId2, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId2);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store1.commitTrx();
- store1.closeTrx();
}
/**
@@ -233,23 +195,17 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final int actionNum2 = 2;
final String actionId1 = jobId + "@" + actionNum1;
final String actionId2 = jobId + "@" + actionNum2;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum1, actionId1, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum1, actionId1, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- addRecordToActionTable(jobId, actionNum2, actionId2, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToActionTable(jobId, actionNum2, actionId2, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action2.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String rerunScope = "1-3";
try {
@@ -269,21 +225,15 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.RUNNING,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.RUNNING,
"coord-rerun-action1.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
try {
final OozieClient coordClient = LocalOozie.getCoordClient();
@@ -297,12 +247,8 @@ public class TestCoordRerunXCommand extends XDataTestCase {
}
}
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertEquals(action2.getStatus(), CoordinatorAction.Status.RUNNING);
- store2.commitTrx();
- store2.closeTrx();
}
/**
@@ -314,38 +260,24 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
- CoordinatorStore store1 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store1.beginTrx();
- CoordinatorActionBean action1 = store1.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action1 = getCoordinatorAction(actionId);
assertEquals(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store1.commitTrx();
- store1.closeTrx();
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_DATE, "2009-12-15T01:00Z", false, true);
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store2.commitTrx();
- store2.closeTrx();
}
/**
@@ -359,37 +291,27 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final int actionNum2 = 2;
final String actionId1 = jobId + "@" + actionNum1;
final String actionId2 = jobId + "@" + actionNum2;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum1, actionId1, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum1, actionId1, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- addRecordToActionTable(jobId, actionNum2, actionId2, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToActionTable(jobId, actionNum2, actionId2, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action2.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String rerunScope = "2009-12-15T01:00Z" + "::" + "2009-12-16T01:00Z";
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_DATE, rerunScope, false, true);
- CoordinatorStore store1 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store1.beginTrx();
- CoordinatorActionBean action1 = store1.getCoordinatorAction(actionId1, false);
+ CoordinatorActionBean action1 = getCoordinatorAction(actionId1);
assertNotSame(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- CoordinatorActionBean action2 = store1.getCoordinatorAction(actionId2, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId2);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store1.commitTrx();
- store1.closeTrx();
}
/**
@@ -403,37 +325,27 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final int actionNum2 = 2;
final String actionId1 = jobId + "@" + actionNum1;
final String actionId2 = jobId + "@" + actionNum2;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum1, actionId1, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum1, actionId1, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- addRecordToActionTable(jobId, actionNum2, actionId2, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToActionTable(jobId, actionNum2, actionId2, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action2.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String rerunScope = "2009-12-15T01:00Z" + "," + "2009-12-16T01:00Z";
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_DATE, rerunScope, false, true);
- CoordinatorStore store1 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store1.beginTrx();
- CoordinatorActionBean action1 = store1.getCoordinatorAction(actionId1, false);
+ CoordinatorActionBean action1 = getCoordinatorAction(actionId1);
assertNotSame(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- CoordinatorActionBean action2 = store1.getCoordinatorAction(actionId2, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId2);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store1.commitTrx();
- store1.closeTrx();
}
/**
@@ -448,37 +360,27 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final int actionNum2 = 2;
final String actionId1 = jobId + "@" + actionNum1;
final String actionId2 = jobId + "@" + actionNum2;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum1, actionId1, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum1, actionId1, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- addRecordToActionTable(jobId, actionNum2, actionId2, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToActionTable(jobId, actionNum2, actionId2, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action2.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String rerunScope = "2009-12-15T01:00Z" + "::" + "2009-12-17T01:00Z";
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_DATE, rerunScope, false, true);
- CoordinatorStore store1 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store1.beginTrx();
- CoordinatorActionBean action1 = store1.getCoordinatorAction(actionId1, false);
+ CoordinatorActionBean action1 = getCoordinatorAction(actionId1);
assertNotSame(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- CoordinatorActionBean action2 = store1.getCoordinatorAction(actionId2, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId2);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store1.commitTrx();
- store1.closeTrx();
}
/**
@@ -493,23 +395,17 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final int actionNum2 = 2;
final String actionId1 = jobId + "@" + actionNum1;
final String actionId2 = jobId + "@" + actionNum2;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum1, actionId1, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum1, actionId1, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- addRecordToActionTable(jobId, actionNum2, actionId2, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToActionTable(jobId, actionNum2, actionId2, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action2.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String rerunScope = "2009-12-15T01:00Z,2009-12-16T01:00Z,2009-12-17T01:00Z";
try {
final OozieClient coordClient = LocalOozie.getCoordClient();
@@ -530,21 +426,15 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
Path appPath = new Path(getFsTestCaseDir(), "coord");
String inputDir = appPath.toString() + "/coord-input/2010/07/09/01/00";
FileSystem fs = getFileSystem();
@@ -554,12 +444,8 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, Integer.toString(actionNum), true, true);
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store2.commitTrx();
- store2.closeTrx();
waitFor(120 * 1000, new Predicate() {
@Override
@@ -569,16 +455,12 @@ public class TestCoordRerunXCommand extends XDataTestCase {
}
});
- CoordinatorStore store3 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store3.beginTrx();
- CoordinatorActionBean action3 = store3.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action3 = getCoordinatorAction(actionId);
String actionXml = action3.getActionXml();
System.out.println("After refresh, action xml= " + actionXml);
Element eAction = XmlUtils.parseXml(actionXml);
String[] urls = getActionXmlUrls(eAction, getTestUser(), getTestGroup());
- store3.commitTrx();
- store3.closeTrx();
/* if (urls != null) {
assertEquals(inputDir, urls[0]);
@@ -597,21 +479,15 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action4.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
Path appPath = new Path(getFsTestCaseDir(), "coord");
String outputDir = appPath.toString() + "/coord-input/2009/12/14/11/00";
Path success = new Path(outputDir, "_SUCCESS");
@@ -625,12 +501,8 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, Integer.toString(actionNum), false, false);
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store2.commitTrx();
- store2.closeTrx();
waitFor(120 * 1000, new Predicate() {
@Override
@@ -655,21 +527,15 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
Path appPath = new Path(getFsTestCaseDir(), "coord");
String outputDir = appPath.toString() + "/coord-input/2009/12/14/11/00";
Path success = new Path(outputDir, "_SUCCESS");
@@ -682,12 +548,8 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, Integer.toString(actionNum), false, false);
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store2.commitTrx();
- store2.closeTrx();
waitFor(120 * 1000, new Predicate() {
@Override
@@ -714,21 +576,15 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml", true);
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
String db = "mydb";
String table = "mytable";
@@ -773,30 +629,20 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.SUCCEEDED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action3.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, Integer.toString(actionNum), false, false);
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- store2.commitTrx();
- store2.closeTrx();
waitFor(120 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
@@ -817,10 +663,7 @@ public class TestCoordRerunXCommand extends XDataTestCase {
public void testCoordRerunInFailed() throws Exception {
CoordinatorJobBean job = this.addRecordToCoordJobTable(Job.Status.FAILED, false, false);
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
- job = jpaService.execute(coordJobGetExecutor);
+ job = getCoordinatorJob(job.getId());
assertEquals(Job.Status.FAILED, job.getStatus());
try {
@@ -848,13 +691,12 @@ public class TestCoordRerunXCommand extends XDataTestCase {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
- job = jpaService.execute(coordJobGetExecutor);
+ job = getCoordinatorJob(job.getId());
assertEquals(Job.Status.DONEWITHERROR, job.getStatus());
new CoordRerunXCommand(job.getId(), RestConstants.JOB_COORD_SCOPE_DATE, "2009-12-15T01:00Z", false, true, false,
null).call();
- job = jpaService.execute(coordJobGetExecutor);
+ job = getCoordinatorJob(job.getId());
assertEquals(Job.Status.RUNNINGWITHERROR, job.getStatus());
}
@@ -871,14 +713,13 @@ public class TestCoordRerunXCommand extends XDataTestCase {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
- job = jpaService.execute(coordJobGetExecutor);
+ job = getCoordinatorJob(job.getId());
assertEquals(Job.Status.PAUSED, job.getStatus());
new CoordRerunXCommand(job.getId(), RestConstants.JOB_COORD_SCOPE_DATE, "2009-12-15T01:00Z", false, true, false,
null).call();
- job = jpaService.execute(coordJobGetExecutor);
+ job = getCoordinatorJob(job.getId());
assertEquals(Job.Status.PAUSED, job.getStatus());
assertNotNull(job.getPauseTime());
}
@@ -896,14 +737,13 @@ public class TestCoordRerunXCommand extends XDataTestCase {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- CoordJobGetJPAExecutor coordJobGetExecutor = new CoordJobGetJPAExecutor(job.getId());
- job = jpaService.execute(coordJobGetExecutor);
+ job = getCoordinatorJob(job.getId());
assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
new CoordRerunXCommand(job.getId(), RestConstants.JOB_COORD_SCOPE_DATE, "2009-12-15T01:00Z", false, true, false,
null).call();
- job = jpaService.execute(coordJobGetExecutor);
+ job = getCoordinatorJob(job.getId());
assertEquals(Job.Status.PAUSEDWITHERROR, job.getStatus());
assertNotNull(job.getPauseTime());
}
@@ -917,21 +757,15 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.KILLED);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.KILLED);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.SUCCEEDED,
"coord-rerun-action1.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
try {
final OozieClient coordClient = LocalOozie.getCoordClient();
@@ -943,12 +777,8 @@ public class TestCoordRerunXCommand extends XDataTestCase {
fail("Coord rerun failed");
}
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertEquals(action2.getStatus(), CoordinatorAction.Status.WAITING);
- store2.commitTrx();
- store2.closeTrx();
}
/*
@@ -958,21 +788,15 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C";
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
try {
- addRecordToJobTable(jobId, store, CoordinatorJob.Status.RUNNING);
- addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.FAILED,
+ addRecordToJobTable(jobId, CoordinatorJob.Status.RUNNING);
+ addRecordToActionTable(jobId, actionNum, actionId, CoordinatorAction.Status.FAILED,
"coord-rerun-action1.xml");
- store.commitTrx();
}
catch (Exception e) {
e.printStackTrace();
fail("Could not update db.");
}
- finally {
- store.closeTrx();
- }
try {
final OozieClient coordClient = LocalOozie.getCoordClient();
@@ -983,8 +807,7 @@ public class TestCoordRerunXCommand extends XDataTestCase {
ex.printStackTrace();
fail("Coord rerun failed");
}
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action2 = getCoordinatorAction(actionId);
assertEquals(action2.getStatus(), CoordinatorAction.Status.WAITING);
assertEquals(action2.getErrorCode(), "");
assertEquals(action2.getErrorMessage(), "");
@@ -1026,16 +849,13 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(coordJob.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, rerunScope, false, true);
- CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
- coordJob = jpaService.execute(coordJobGetCmd);
+ coordJob = getCoordinatorJob(coordJob.getId());
assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
- CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action1.getId());
- action1 = jpaService.execute(coordActionGetCmd);
+ action1 = getCoordinatorAction(action1.getId());
assertNotSame(action1.getStatus(), CoordinatorAction.Status.FAILED);
- coordActionGetCmd = new CoordActionGetJPAExecutor(action2.getId());
- action2 = jpaService.execute(coordActionGetCmd);
+ action2 = getCoordinatorAction(action2.getId());
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
}
@@ -1075,16 +895,13 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(coordJob.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, rerunScope, false, true);
- CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
- coordJob = jpaService.execute(coordJobGetCmd);
+ coordJob = getCoordinatorJob(coordJob.getId());
assertEquals(CoordinatorJob.Status.SUSPENDED, coordJob.getStatus());
- CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action1.getId());
- action1 = jpaService.execute(coordActionGetCmd);
+ action1 = getCoordinatorAction(action1.getId());
assertNotSame(action1.getStatus(), CoordinatorAction.Status.FAILED);
- coordActionGetCmd = new CoordActionGetJPAExecutor(action2.getId());
- action2 = jpaService.execute(coordActionGetCmd);
+ action2 = getCoordinatorAction(action2.getId());
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
}
@@ -1125,16 +942,13 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final OozieClient coordClient = LocalOozie.getCoordClient();
coordClient.reRunCoord(coordJob.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, rerunScope, false, true);
- CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
- coordJob = jpaService.execute(coordJobGetCmd);
+ coordJob = getCoordinatorJob(coordJob.getId());
assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus());
- CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action1.getId());
- action1 = jpaService.execute(coordActionGetCmd);
+ action1 = getCoordinatorAction(action1.getId());
assertNotSame(action1.getStatus(), CoordinatorAction.Status.SUCCEEDED);
- coordActionGetCmd = new CoordActionGetJPAExecutor(action2.getId());
- action2 = jpaService.execute(coordActionGetCmd);
+ action2 = getCoordinatorAction(action2.getId());
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
}
@@ -1157,7 +971,7 @@ public class TestCoordRerunXCommand extends XDataTestCase {
return coordJob;
}
- private void addRecordToJobTable(String jobId, CoordinatorStore store, CoordinatorJob.Status status)
+ private void addRecordToJobTable(String jobId, CoordinatorJob.Status status)
throws StoreException, IOException {
Path appPath = new Path(getFsTestCaseDir(), "coord");
String appXml = getCoordJobXml(appPath);
@@ -1199,22 +1013,21 @@ public class TestCoordRerunXCommand extends XDataTestCase {
}
try {
- store.insertCoordinatorJob(coordJob);
- }
- catch (StoreException se) {
- se.printStackTrace();
- store.rollbackTrx();
+ addRecordToCoordJobTable(coordJob);
+ } catch (Exception e) {
+ e.printStackTrace();
fail("Unable to insert the test job record to table");
- throw se;
+ throw new StoreException(ErrorCode.E1019, e.getMessage());
}
+
}
- private void addRecordToActionTable(String jobId, int actionNum, String actionId, CoordinatorStore store,
+ private void addRecordToActionTable(String jobId, int actionNum, String actionId,
CoordinatorAction.Status status, String resourceXmlName) throws StoreException, IOException {
- addRecordToActionTable(jobId, actionNum, actionId, store, status, resourceXmlName, false);
+ addRecordToActionTable(jobId, actionNum, actionId, status, resourceXmlName, false);
}
- private void addRecordToActionTable(String jobId, int actionNum, String actionId, CoordinatorStore store,
+ private void addRecordToActionTable(String jobId, int actionNum, String actionId,
CoordinatorAction.Status status, String resourceXmlName, boolean isHCatDep) throws StoreException,
IOException {
Path appPath = new Path(getFsTestCaseDir(), "coord");
@@ -1253,14 +1066,13 @@ public class TestCoordRerunXCommand extends XDataTestCase {
}
try {
- store.insertCoordinatorAction(action);
- }
- catch (StoreException se) {
- se.printStackTrace();
- store.rollbackTrx();
+ addRecordToCoordActionTable(action, null);
+ } catch (Exception e) {
+ e.printStackTrace();
fail("Unable to insert the test job record to table");
- throw se;
+ throw new StoreException(ErrorCode.E1019, e.getMessage());
}
+
}
private Properties getCoordProp(Path appPath) throws IOException {
@@ -1394,8 +1206,7 @@ public class TestCoordRerunXCommand extends XDataTestCase {
String actionId = action.getId();
new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "myjob").call();
- final JPAService jpaService = Services.get().get(JPAService.class);
- action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ action = getCoordinatorAction(actionId);
if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) {
fail("CoordActionStartCommand didn't work because the status for action id" + actionId + " is :"
@@ -1523,8 +1334,7 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String actionId = action.getId();
new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "myjob").call();
- final JPAService jpaService = Services.get().get(JPAService.class);
- action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ action = getCoordinatorAction(actionId);
if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) {
fail("CoordActionStartCommand didn't work because the status for action id" + actionId + " is :"
@@ -1585,8 +1395,7 @@ public class TestCoordRerunXCommand extends XDataTestCase {
final String actionId = action.getId();
new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "myjob").call();
- final JPAService jpaService = Services.get().get(JPAService.class);
- action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ action = getCoordinatorAction(actionId);
if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) {
fail("CoordActionStartCommand didn't work because the status for action id" + actionId + " is :"
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/command/coord/TestFutureActionsTimeOut.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestFutureActionsTimeOut.java b/core/src/test/java/org/apache/oozie/command/coord/TestFutureActionsTimeOut.java
index 801207b..a6fce4e 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestFutureActionsTimeOut.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestFutureActionsTimeOut.java
@@ -36,9 +36,9 @@ import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.StoreService;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XConfiguration;
@@ -133,12 +133,11 @@ public class TestFutureActionsTimeOut extends XTestCase {
* @throws StoreException
*/
private void checkCoordJob(String jobId) throws StoreException {
- CoordinatorStore store = Services.get().get(StoreService.class)
- .getStore(CoordinatorStore.class);
+ CoordinatorJobBean coordJob = null;
try {
- CoordinatorJobBean job = store.getCoordinatorJob(jobId, false);
+ coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, jobId);
}
- catch (StoreException se) {
+ catch (JPAExecutorException e) {
fail("Job ID " + jobId + " was not stored properly in db");
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/command/coord/TestPastActionsTimeOut.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestPastActionsTimeOut.java b/core/src/test/java/org/apache/oozie/command/coord/TestPastActionsTimeOut.java
index 708dcaf..f4327ae 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestPastActionsTimeOut.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestPastActionsTimeOut.java
@@ -32,9 +32,9 @@ import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorEngine;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.StoreService;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XConfiguration;
@@ -98,11 +98,11 @@ public class TestPastActionsTimeOut extends XTestCase {
* @throws StoreException
*/
private void checkCoordJob(String jobId) throws StoreException {
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
try {
- CoordinatorJobBean job = store.getCoordinatorJob(jobId, false);
+ CoordinatorJobBean job = CoordJobQueryExecutor.getInstance()
+ .get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, jobId);
}
- catch (StoreException se) {
+ catch (JPAExecutorException se) {
fail("Job ID " + jobId + " was not stored properly in db");
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 9f8e65f..c6ecd76 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -60,8 +60,6 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.StoreException;
import org.apache.oozie.store.WorkflowStore;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
@@ -270,18 +268,10 @@ public class TestRecoveryService extends XDataTestCase {
* @throws Exception
*/
public void testBundleRecoveryCoordCreate() throws Exception {
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
final BundleActionBean bundleAction;
final BundleJobBean bundle;
- store.beginTrx();
- try {
- bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
- bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
- store.commitTrx();
- }
- finally {
- store.closeTrx();
- }
+ bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+ bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
final JPAService jpaService = Services.get().get(JPAService.class);
sleep(3000);
@@ -319,20 +309,12 @@ public class TestRecoveryService extends XDataTestCase {
* @throws Exception
*/
public void testBundleRecoveryCoordExists() throws Exception {
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
final BundleActionBean bundleAction;
final BundleJobBean bundle;
final CoordinatorJob coord;
- store.beginTrx();
- try {
- bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
- coord = addRecordToCoordJobTable(Job.Status.PREP, false, false);
- bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
- store.commitTrx();
- }
- finally {
- store.closeTrx();
- }
+ bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+ coord = addRecordToCoordJobTable(Job.Status.PREP, false, false);
+ bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
final JPAService jpaService = Services.get().get(JPAService.class);
sleep(3000);
@@ -362,20 +344,12 @@ public class TestRecoveryService extends XDataTestCase {
final int actionNum = 1;
final String actionId = jobId + "@" + actionNum;
final CoordinatorEngine ce = new CoordinatorEngine(getTestUser());
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store.beginTrx();
- try {
- createTestCaseSubDir("one-op");
- createTestCaseSubDir("one-op", "lib");
- createTestCaseSubDir("workflows");
- createTestCaseSubDir("in");
- addRecordToJobTable(jobId, store, getTestCaseDir());
- addRecordToActionTable(jobId, actionNum, actionId, store, getTestCaseDir());
- store.commitTrx();
- }
- finally {
- store.closeTrx();
- }
+ createTestCaseSubDir("one-op");
+ createTestCaseSubDir("one-op", "lib");
+ createTestCaseSubDir("workflows");
+ createTestCaseSubDir("in");
+ addRecordToJobTable(jobId, getTestCaseDir());
+ addRecordToActionTable(jobId, actionNum, actionId, getTestCaseDir());
sleep(3000);
Runnable recoveryRunnable = new RecoveryRunnable(0, 1,1);
@@ -388,9 +362,7 @@ public class TestRecoveryService extends XDataTestCase {
}
});
- CoordinatorStore store2 = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
- store2.beginTrx();
- CoordinatorActionBean action = store2.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action = getCoordinatorAction(actionId);
if (action.getStatus() == CoordinatorAction.Status.RUNNING
|| action.getStatus() == CoordinatorAction.Status.SUCCEEDED) {
@@ -398,8 +370,6 @@ public class TestRecoveryService extends XDataTestCase {
else {
fail();
}
- store2.commitTrx();
- store2.closeTrx();
}
/**
@@ -664,7 +634,8 @@ public class TestRecoveryService extends XDataTestCase {
}
}
- private void addRecordToActionTable(String jobId, int actionNum, String actionId, CoordinatorStore store, String baseDir) throws StoreException, IOException {
+ private void addRecordToActionTable(String jobId, int actionNum, String actionId, String baseDir)
+ throws Exception {
CoordinatorActionBean action = new CoordinatorActionBean();
action.setJobId(jobId);
action.setId(actionId);
@@ -730,7 +701,7 @@ public class TestRecoveryService extends XDataTestCase {
createdConf = conf.toXmlString(false);
action.setCreatedConf(createdConf);
- store.insertCoordinatorAction(action);
+ addRecordToCoordActionTable(action, null);
String content = "<workflow-app xmlns='uri:oozie:workflow:0.1' xmlns:sla='uri:oozie:sla:0.1' name='one-op-wf'>";
content += "<start to='fs1'/><action name='fs1'><fs><mkdir path='/tmp'/></fs><ok to='end'/><error to='end'/></action>";
content += "<end name='end' /></workflow-app>";
@@ -761,7 +732,7 @@ public class TestRecoveryService extends XDataTestCase {
new File(dir, "_SUCCESS").mkdirs();
}
- private void addRecordToJobTable(String jobId, CoordinatorStore store, String baseDir) throws StoreException {
+ private void addRecordToJobTable(String jobId, String baseDir) throws Exception {
CoordinatorJobBean coordJob = new CoordinatorJobBean();
coordJob.setId(jobId);
coordJob.setAppName("testApp");
@@ -829,11 +800,10 @@ public class TestRecoveryService extends XDataTestCase {
}
try {
- store.insertCoordinatorJob(coordJob);
+ addRecordToCoordJobTable(coordJob);
}
- catch (StoreException se) {
+ catch (Exception se) {
se.printStackTrace();
- store.rollbackTrx();
fail("Unable to insert the test job record to table");
throw se;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/store/TestCoordinatorStore.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/store/TestCoordinatorStore.java b/core/src/test/java/org/apache/oozie/store/TestCoordinatorStore.java
deleted file mode 100644
index b8b2405..0000000
--- a/core/src/test/java/org/apache/oozie/store/TestCoordinatorStore.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.store;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.CoordinatorAction.Status;
-import org.apache.oozie.service.CoordinatorStoreService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XTestCase;
-
-public class TestCoordinatorStore extends XTestCase {
- Services services;
- CoordinatorStore store;
- CoordinatorJobBean coordBean;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- services = new Services();
- services.init();
- store = Services.get().get(CoordinatorStoreService.class).create();
- }
-
- @Override
- protected void tearDown() throws Exception {
- // dropSchema(dbName, conn);
- services.destroy();
- super.tearDown();
- }
-
- public void testCoordStore() throws StoreException {
- String jobId = "00000-" + new Date().getTime() + "-TestCoordinatorStore-C";
- String actionId = jobId + "_1";
- try {
- _testInsertJob(jobId);
- _testGetJob(jobId);
- _testGetMatJobLists();
- _testUpdateCoordJob(jobId);
- _testInsertAction(jobId, actionId);
- _testGetAction(jobId, actionId);
- _testGetActionForJob(jobId, actionId);
- _testGetActionForJobInExecOrder(jobId, actionId);
- _testGetActionForJobInLastOnly(jobId, actionId);
- _testGetActionRunningCount(actionId);
- _testGetRecoveryActionsGroupByJobId(jobId);
- _testUpdateCoordAction(actionId);
- _testUpdateCoordActionMin(actionId);
- }
- finally {
- // store.closeTrx();
- }
- }
-
- private void _testUpdateCoordAction(String actionId) {
- store.beginTrx();
- try {
- CoordinatorActionBean action = store.getCoordinatorAction(actionId, true);
- int newActNum = action.getActionNumber() + 1;
- action.setActionNumber(newActNum);
- store.updateCoordinatorAction(action);
- store.getEntityManager().flush();
- store.getEntityManager().merge(action);
- action = store.getCoordinatorAction(actionId, false);
- assertEquals(newActNum, action.getActionNumber());
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to Update a record in Coord Action. actionId =" + actionId);
- }
-
- }
-
- private void _testUpdateCoordActionMin(String actionId) {
- store.beginTrx();
- try {
- CoordinatorActionBean action = store.getCoordinatorAction(actionId, true);
- action.setStatus(CoordinatorAction.Status.SUCCEEDED);
- action.setMissingDependencies("d1,d2,d3");
- action.setActionNumber(777);
- Date lastModifiedTime = new Date();
- action.setLastModifiedTime(lastModifiedTime);
- store.updateCoordActionMin(action);
- store.commitTrx();
- //store.getEntityManager().merge(action);
- action = getCoordAction(actionId);
- assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
- assertEquals("d1,d2,d3", action.getMissingDependencies());
- //assertEquals(lastModifiedTime, action.getLastModifiedTime());
- if (action.getActionNumber() == 777) {
- fail("Action number should not be updated");
- }
- }
- catch (Exception ex) {
- if (store.isActive()) {
- store.rollbackTrx();
- }
- ex.printStackTrace();
- fail("Unable to Update a record in Coord Action. actionId =" + actionId);
- }
-
- }
-
- private void _testGetActionRunningCount(String actionId) {
- store.beginTrx();
- try {
- int count = store.getCoordinatorRunningActionsCount(actionId);
- assertEquals(count, 0);
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to GET count for action ID. actionId =" + actionId);
- }
- }
-
-
- private void _testGetActionForJobInExecOrder(String jobId, String actionId) {
- store.beginTrx();
- try {
- List<CoordinatorActionBean> actionList = store.getCoordinatorActionsForJob(jobId, 1,
- CoordinatorJob.Execution.FIFO.toString());
- assertEquals(actionList.size(), 1);
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to GET a record for COORD Action_FOR_JOB with Exec Order. actionId =" + actionId + " jobId ="
- + jobId);
- }
- }
-
- private void _testGetActionForJobInLastOnly(String jobId, String actionId) {
- store.beginTrx();
- try {
- List<CoordinatorActionBean> actionList = store.getCoordinatorActionsForJob(jobId, 3,
- CoordinatorJob.Execution.LAST_ONLY.toString());
- assertEquals(actionList.size(), 1);
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to GET a record for COORD Action_FOR_JOB with Exec Order. actionId =" + actionId + " jobId ="
- + jobId);
- }
- }
-
- private void _testGetActionForJob(String jobId, String actionId) {
- store.beginTrx();
- try {
- int coordActionsCount = store.getActionsForCoordinatorJob(jobId, false);
- assertEquals(coordActionsCount, 1);
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to GET a record for COORD Action_FOR_JOB. actionId =" + actionId + " jobId =" + jobId);
- }
- }
-
- private void _testGetAction(String jobId, String actionId) throws StoreException {
- store.beginTrx();
- try {
- CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
- assertEquals(jobId, action.getJobId());
- assertEquals(action.getStatus(), CoordinatorAction.Status.READY);
- assertEquals(action.getActionNumber(), 1);
- assertEquals(action.getExternalId(), actionId + "_E");
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to GET a record for COORD Action. actionId =" + actionId);
- }
- }
-
- private void _testGetRecoveryActionsGroupByJobId(String jobId) throws StoreException {
- store.beginTrx();
- try {
- List<String> jobids = store.getRecoveryActionsGroupByJobId(60);
- assertNotNull(jobids);
- assertEquals(jobId, jobids.get(0));
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to GET a record for RecoveryActionsGroupByJobId. jobId =" + jobId);
- }
- }
-
- private void _testInsertAction(String jobId, String actionId) {
- CoordinatorActionBean action = createAction(jobId, actionId);
- }
-
- private CoordinatorActionBean createAction(String jobId, String actionId) {
- CoordinatorActionBean action = new CoordinatorActionBean();
- action.setJobId(jobId);
- action.setId(actionId);
- action.setActionNumber(1);
- action.setNominalTime(new Date());
- action.setStatus(Status.READY);
- action.setExternalId(actionId + "_E");
- action.setLastModifiedTime(new Date(new Date().getTime() - 1200000));
- store.beginTrx();
- try {
- store.insertCoordinatorAction(action);
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to insert a record into COORD Action ");
- }
- return action;
- }
-
- private void _testUpdateCoordJob(String jobId) {
- store.beginTrx();
- try {
- CoordinatorJobBean job = store.getCoordinatorJob(jobId, false);
- int newFreq = Integer.valueOf(job.getFrequency()) + 1;
- job.setFrequency(Integer.toString(newFreq));
- store.updateCoordinatorJob(job);
- store.getEntityManager().flush();
- store.getEntityManager().merge(job);
- job = store.getCoordinatorJob(jobId, false);
- assertEquals(Integer.toString(newFreq), job.getFrequency());
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to UPDATE a record for COORD Job. jobId =" + jobId);
- }
-
- }
-
- private void _testGetMatJobLists() throws StoreException {
- store.beginTrx();
- try {
- Date d1 = new Date();
- Date d2 = new Date(d1.getTime() + 1000);
- List<CoordinatorJobBean> jobList = store.getCoordinatorJobsToBeMaterialized(d2, 50);
- if (jobList.size() == 0) {
- fail("Test of getCoordinatorJobsToBeMaterialized returned no records. Date =" + d2);
- }
- // Assumption: no other older records are there
- d2 = new Date(d1.getTime() - 86400000L * 365L);
- jobList = store.getCoordinatorJobsToBeMaterialized(d2, 50);
- /*
- * if(jobList.size() > 0){ fail("Test of
- * getCoordinatorJobsToBeMaterialized returned some records while
- * expecting no records = " + d2); }
- */
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to Get Materialized Jobs ");
- }
- }
-
- private void _testGetJob(String jobId) throws StoreException {
- store.beginTrx();
- try {
- CoordinatorJobBean job = store.getCoordinatorJob(jobId, false);
- assertEquals(jobId, job.getId());
- assertEquals(job.getStatus(), CoordinatorJob.Status.PREP);
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to GET a record for COORD Job. jobId =" + jobId);
- }
- }
-
- private void _testInsertJob(String jobId) throws StoreException {
- CoordinatorJobBean job = createCoordJob(jobId);
- store.beginTrx();
- try {
- store.insertCoordinatorJob(job);
- store.commitTrx();
- }
- catch (Exception ex) {
- store.rollbackTrx();
- ex.printStackTrace();
- fail("Unable to insert a record into COORD Job ");
- }
- }
-
- private CoordinatorJobBean createCoordJob(String jobId) {
- CoordinatorJobBean coordJob = new CoordinatorJobBean();
-
- coordJob.setId(jobId);
- coordJob.setAppName("testApp");
- coordJob.setAppPath("testAppPath");
- coordJob.setStatus(CoordinatorJob.Status.PREP);
- coordJob.setCreatedTime(new Date());
- coordJob.setUser("testUser");
- coordJob.setGroup("testGroup");
- String confStr = "<configuration></configuration>";
- coordJob.setConf(confStr);
- String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' name='NAME' frequency=\"1\" start='2009-02-01T01:00Z' end='2009-02-03T23:59Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
- appXml += "<controls>";
- appXml += "<timeout>10</timeout>";
- appXml += "<concurrency>2</concurrency>";
- appXml += "<execution>LIFO</execution>";
- appXml += "</controls>";
- appXml += "<input-events>";
- appXml += "<data-in name='A' dataset='a'>";
- appXml += "<dataset name='a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
- appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>";
- appXml += "</dataset>";
- appXml += "<instance>${coord:latest(0)}</instance>";
- appXml += "</data-in>";
- appXml += "</input-events>";
- appXml += "<output-events>";
- appXml += "<data-out name='LOCAL_A' dataset='local_a'>";
- appXml += "<dataset name='local_a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
- appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>";
- appXml += "</dataset>";
- appXml += "<instance>${coord:current(-1)}</instance>";
- appXml += "</data-out>";
- appXml += "</output-events>";
- appXml += "<action>";
- appXml += "<workflow>";
- appXml += "<app-path>hdfs:///tmp/workflows/</app-path>";
- appXml += "<configuration>";
- appXml += "<property>";
- appXml += "<name>inputA</name>";
- appXml += "<value>${coord:dataIn('A')}</value>";
- appXml += "</property>";
- appXml += "<property>";
- appXml += "<name>inputB</name>";
- appXml += "<value>${coord:dataOut('LOCAL_A')}</value>";
- appXml += "</property>";
- appXml += "</configuration>";
- appXml += "</workflow>";
- appXml += "</action>";
- appXml += "</coordinator-app>";
- coordJob.setJobXml(appXml);
- coordJob.setLastActionNumber(0);
- coordJob.setFrequency("1");
- Date curr = new Date();
- coordJob.setNextMaterializedTime(curr);
- coordJob.setLastModifiedTime(curr);
- coordJob.setEndTime(new Date(curr.getTime() + 86400000));
- coordJob.setStartTime(new Date(curr.getTime() - 86400000));
- return coordJob;
- }
-
- /**
- * Helper methods
- *
- * @param jobId
- * @throws StoreException
- */
- private CoordinatorActionBean getCoordAction(String actionId) throws StoreException {
- CoordinatorStore store = new CoordinatorStore(false);
- try {
- CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
- return action;
- }
- catch (StoreException se) {
- fail("Job ID " + actionId + " was not stored properly in db");
- }finally {
- store.closeTrx();
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index e885892..ac8f1f9 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -633,6 +633,50 @@ public abstract class XDataTestCase extends XHCatTestCase {
return action;
}
+ protected CoordinatorActionBean addRecordToCoordActionTable(CoordinatorActionBean action, String wfId)
+ throws Exception {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordActionInsertJPAExecutor coordActionInsertExecutor = new CoordActionInsertJPAExecutor(action);
+ jpaService.execute(coordActionInsertExecutor);
+
+ if (wfId != null) {
+ WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
+ wfJob.setParentId(action.getId());
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob);
+ }
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord action record to table");
+ throw je;
+ }
+ return action;
+ }
+
+ protected CoordinatorJobBean getCoordinatorJob(String jobId) throws Exception{
+ CoordinatorJobBean coordJob = null;
+ try {
+ coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new Exception(e);
+ }
+ return coordJob;
+ }
+
+ protected CoordinatorActionBean getCoordinatorAction(String actionId) throws Exception{
+ CoordinatorActionBean cAction = null;
+ try {
+ cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionId);
+ }
+ catch (JPAExecutorException e) {
+ throw new Exception(e);
+ }
+ return cAction;
+ }
+
protected CoordinatorActionBean createCoordAction(String jobId, int actionNum, CoordinatorAction.Status status,
String resourceXmlName, int pending) throws Exception {
return createCoordAction(jobId, actionNum, status, resourceXmlName, pending, "Z", null);
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 27cbdc9..e360369 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.persistence.EntityManager;
+import javax.persistence.FlushModeType;
import javax.persistence.Query;
import junit.framework.TestCase;
@@ -78,7 +79,6 @@ import org.apache.oozie.service.StoreService;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.sla.SLASummaryBean;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.MiniHCatServer.RUNMODE;
import org.apache.oozie.test.hive.MiniHS2;
@@ -794,9 +794,9 @@ public abstract class XTestCase extends TestCase {
}
private void cleanUpDBTablesInternal() throws StoreException {
- CoordinatorStore store = new CoordinatorStore(false);
- EntityManager entityManager = store.getEntityManager();
- store.beginTrx();
+ EntityManager entityManager = Services.get().get(JPAService.class).getEntityManager();
+ entityManager.setFlushMode(FlushModeType.COMMIT);
+ entityManager.getTransaction().begin();
Query q = entityManager.createNamedQuery("GET_WORKFLOWS");
List<WorkflowJobBean> wfjBeans = q.getResultList();
@@ -861,8 +861,8 @@ public abstract class XTestCase extends TestCase {
entityManager.remove(w);
}
- store.commitTrx();
- store.closeTrx();
+ entityManager.getTransaction().commit();
+ entityManager.close();
log.info(wfjSize + " entries in WF_JOBS removed from DB!");
log.info(wfaSize + " entries in WF_ACTIONS removed from DB!");
log.info(cojSize + " entries in COORD_JOBS removed from DB!");
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 096bd4f..7e85b2c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.2.0 release (trunk - unreleased)
+OOZIE-2163 Remove CoordinatorStore (seoeun25 via bzhang)
OOZIE-2221 Oozie audit log has null id for some of input request (puru)
OOZIE-2223 Improve documentation with regard to Java action retries (ben.roling via bzhang)
OOZIE-2218 META-INF directories in the war file have 777 permissions (rkanter)
[2/2] oozie git commit: OOZIE-2163 Remove CoordinatorStore (seoeun25
via bzhang)
Posted by bz...@apache.org.
OOZIE-2163 Remove CoordinatorStore (seoeun25 via bzhang)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6ea1ed83
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6ea1ed83
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6ea1ed83
Branch: refs/heads/master
Commit: 6ea1ed8331cc519a6e95f784c8a131949c8ff2bf
Parents: 60f3d18
Author: Bowen Zhang <bo...@yahoo.com>
Authored: Fri May 1 13:41:31 2015 -0700
Committer: Bowen Zhang <bo...@yahoo.com>
Committed: Fri May 1 13:42:42 2015 -0700
----------------------------------------------------------------------
.../oozie/service/CoordinatorStoreService.java | 99 --
.../apache/oozie/service/SLAStoreService.java | 1 -
.../org/apache/oozie/service/StoreService.java | 25 +-
.../apache/oozie/store/CoordinatorStore.java | 896 -------------------
core/src/main/resources/oozie-default.xml | 1 -
.../org/apache/oozie/TestCoordinatorEngine.java | 10 +-
.../coord/TestCoordActionsIgnoreXCommand.java | 32 -
.../command/coord/TestCoordChangeXCommand.java | 1 -
.../command/coord/TestCoordELExtensions.java | 6 +-
.../command/coord/TestCoordRerunXCommand.java | 385 ++------
.../command/coord/TestFutureActionsTimeOut.java | 11 +-
.../command/coord/TestPastActionsTimeOut.java | 10 +-
.../oozie/service/TestRecoveryService.java | 66 +-
.../oozie/store/TestCoordinatorStore.java | 403 ---------
.../org/apache/oozie/test/XDataTestCase.java | 44 +
.../java/org/apache/oozie/test/XTestCase.java | 12 +-
release-log.txt | 1 +
17 files changed, 187 insertions(+), 1816 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/main/java/org/apache/oozie/service/CoordinatorStoreService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordinatorStoreService.java b/core/src/main/java/org/apache/oozie/service/CoordinatorStoreService.java
deleted file mode 100644
index c941813..0000000
--- a/core/src/main/java/org/apache/oozie/service/CoordinatorStoreService.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.service;
-
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.service.Service;
-import org.apache.oozie.store.Store;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.ErrorCode;
-
-/**
- * Base service for persistency of jobs and actions.
- */
-public class CoordinatorStoreService implements Service {
-
- public final static String TRANSIENT_VAR_PREFIX = "oozie.coordinator.";
- public static final String WORKFLOW_BEAN = TRANSIENT_VAR_PREFIX
- + "coordinator.bean";
- final static String ACTION_ID = "action.id";
- final static String ACTIONS_TO_KILL = TRANSIENT_VAR_PREFIX
- + "actions.to.kill";
- final static String ACTIONS_TO_FAIL = TRANSIENT_VAR_PREFIX
- + "actions.to.fail";
- final static String ACTIONS_TO_START = TRANSIENT_VAR_PREFIX
- + "actions.to.start";
-
- /**
- * Return the public interface of the service.
- *
- * @return {@link WorkflowStoreService}.
- */
- public Class<? extends Service> getInterface() {
- return CoordinatorStoreService.class;
- }
-
- /**
- * Return a workflow store instance with a fresh transaction. <p/> The coordinator store has to be committed and then
- * closed to commit changes, if only close it rolls back.
- *
- * @return a coordinator store.
- * @throws StoreException thrown if the workflow store could not be created.
- */
- public CoordinatorStore create() throws StoreException {
- try {
- return new CoordinatorStore(false);
- }
- catch (Exception ex) {
- throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex);
- }
- }
-
- /**
- * Return a workflow store instance with an existing transaction. <p/> The workflow store has to be committed and then
- * closed to commit changes, if only close it rolls back.
- *
- * @return a workflow store.
- * @throws StoreException thrown if the workflow store could not be created.
- */
- // to do this method can be abstract or should be overridden
- public <S extends Store> CoordinatorStore create(S store)
- throws StoreException {
- try {
- return new CoordinatorStore(store, false);
- }
- catch (Exception ex) {
- throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex);
- }
- }
-
- /**
- * Initializes the {@link StoreService}.
- *
- * @param services services instance.
- */
- public void init(Services services) throws ServiceException {
- }
-
- /**
- * Destroy the StoreService
- */
- public void destroy() {
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/main/java/org/apache/oozie/service/SLAStoreService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/SLAStoreService.java b/core/src/main/java/org/apache/oozie/service/SLAStoreService.java
index 75e46a2..22f45f8 100644
--- a/core/src/main/java/org/apache/oozie/service/SLAStoreService.java
+++ b/core/src/main/java/org/apache/oozie/service/SLAStoreService.java
@@ -19,7 +19,6 @@
package org.apache.oozie.service;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.SLAStore;
import org.apache.oozie.store.Store;
import org.apache.oozie.store.StoreException;
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/main/java/org/apache/oozie/service/StoreService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/StoreService.java b/core/src/main/java/org/apache/oozie/service/StoreService.java
index bc8a2b6..7868e02 100644
--- a/core/src/main/java/org/apache/oozie/service/StoreService.java
+++ b/core/src/main/java/org/apache/oozie/service/StoreService.java
@@ -19,12 +19,9 @@
package org.apache.oozie.service;
import org.apache.oozie.store.StoreException;
-import org.apache.oozie.service.Service;
-import org.apache.oozie.service.Services;
import org.apache.oozie.store.SLAStore;
import org.apache.oozie.store.Store;
import org.apache.oozie.store.WorkflowStore;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.ErrorCode;
import javax.persistence.EntityManager;
@@ -43,15 +40,8 @@ public class StoreService implements Service {
if (WorkflowStore.class.equals(klass)) {
return (S) Services.get().get(WorkflowStoreService.class).create();
}
- else {
- if (CoordinatorStore.class.equals(klass)) {
- return (S) Services.get().get(CoordinatorStoreService.class).create();
- }
- else {
- if (SLAStore.class.equals(klass)) {
- return (S) Services.get().get(SLAStoreService.class).create();
- }
- }
+ else if (SLAStore.class.equals(klass)) {
+ return (S) Services.get().get(SLAStoreService.class).create();
}
// to do add checks for other stores - coordinator and SLA stores
throw new StoreException(ErrorCode.E0607, " can not get store StoreService.getStore(Class)", "");
@@ -67,15 +57,8 @@ public class StoreService implements Service {
if (WorkflowStore.class.equals(klass)) {
return (S) Services.get().get(WorkflowStoreService.class).create(store);
}
- else {
- if (CoordinatorStore.class.equals(klass)) {
- return (S) Services.get().get(CoordinatorStoreService.class).create(store);
- }
- else {
- if (SLAStore.class.equals(klass)) {
- return (S) Services.get().get(SLAStoreService.class).create(store);
- }
- }
+ else if (SLAStore.class.equals(klass)) {
+ return (S) Services.get().get(SLAStoreService.class).create(store);
}
throw new StoreException(ErrorCode.E0607, " StoreService.getStore(Class, store)", "");
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java b/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
deleted file mode 100644
index 1bb7860..0000000
--- a/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
+++ /dev/null
@@ -1,896 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.store;
-
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.CoordinatorJobInfo;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.Job.Status;
-import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.service.InstrumentationService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.Instrumentation;
-import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.workflow.WorkflowException;
-import org.apache.openjpa.persistence.OpenJPAPersistence;
-import org.apache.openjpa.persistence.OpenJPAQuery;
-import org.apache.openjpa.persistence.jdbc.FetchDirection;
-import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
-import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
-import org.apache.openjpa.persistence.jdbc.ResultSetType;
-
-/**
- * DB Implementation of Coord Store
- */
-public class CoordinatorStore extends Store {
- private final XLog log = XLog.getLog(getClass());
-
- private EntityManager entityManager;
- private static final String INSTR_GROUP = "db";
- public static final int LOCK_TIMEOUT = 50000;
- private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
-
- public CoordinatorStore(boolean selectForUpdate) throws StoreException {
- super();
- entityManager = getEntityManager();
- }
-
- public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
- super(store);
- entityManager = getEntityManager();
- }
-
- /**
- * Create a CoordJobBean. It also creates the process instance for the job.
- *
- * @param workflow workflow bean
- * @throws StoreException
- */
-
- public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
- ParamChecker.notNull(coordinatorJob, "coordinatorJob");
-
- doOperation("insertCoordinatorJob", new Callable<Void>() {
- public Void call() throws StoreException {
- entityManager.persist(coordinatorJob);
- return null;
- }
- });
- }
-
- /**
- * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
- * Workflow depending on the locking parameter.
- *
- * @param id Job ID
- * @param locking Flag for Table Lock
- * @return CoordinatorJobBean
- * @throws StoreException
- */
- public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
- ParamChecker.notEmpty(id, "CoordJobId");
- CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
- @SuppressWarnings("unchecked")
- public CoordinatorJobBean call() throws StoreException {
- Query q = entityManager.createNamedQuery("GET_COORD_JOB");
- q.setParameter("id", id);
- /*
- * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
- * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
- * FetchPlan fetch = oq.getFetchPlan();
- * fetch.setReadLockMode(LockModeType.WRITE);
- * fetch.setLockTimeout(-1); // 1 second }
- */
- List<CoordinatorJobBean> cjBeans = q.getResultList();
-
- if (cjBeans.size() > 0) {
- return cjBeans.get(0);
- }
- else {
- throw new StoreException(ErrorCode.E0604, id);
- }
- }
- });
-
- cjBean.setStatus(cjBean.getStatus());
- return cjBean;
- }
-
- /**
- * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
- * argument will be returned.
- *
- * @param d Date
- * @return List of Coordinator Jobs that have a last materialized time older than input date
- * @throws StoreException
- */
- public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
- throws StoreException {
-
- ParamChecker.notNull(d, "Coord Job Materialization Date");
- List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
- new Callable<List<CoordinatorJobBean>>() {
- public List<CoordinatorJobBean> call() throws StoreException {
-
- List<CoordinatorJobBean> cjBeans;
- List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
- try {
- Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
- q.setParameter("matTime", new Timestamp(d.getTime()));
- if (limit > 0) {
- q.setMaxResults(limit);
- }
- /*
- OpenJPAQuery oq = OpenJPAPersistence.cast(q);
- FetchPlan fetch = oq.getFetchPlan();
- fetch.setReadLockMode(LockModeType.WRITE);
- fetch.setLockTimeout(-1); // no limit
- */
- cjBeans = q.getResultList();
- // copy results to a new object
- for (CoordinatorJobBean j : cjBeans) {
- jobList.add(j);
- }
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- return jobList;
-
- }
- });
- return cjBeans;
- }
-
- /**
- * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
- * checkAgeSecs will be returned.
- *
- * @param checkAgeSecs Job age in Seconds
- * @param status Coordinator Job Status
- * @param limit Number of results to return
- * @param locking Flag for Table Lock
- * @return List of Coordinator Jobs that are matched with the parameters.
- * @throws StoreException
- */
- public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
- final int limit, final boolean locking) throws StoreException {
-
- ParamChecker.notNull(status, "Coord Job Status");
- List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
- new Callable<List<CoordinatorJobBean>>() {
- public List<CoordinatorJobBean> call() throws StoreException {
-
- List<CoordinatorJobBean> cjBeans;
- List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
- try {
- Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
- Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
- q.setParameter("lastModTime", ts);
- q.setParameter("status", status);
- if (limit > 0) {
- q.setMaxResults(limit);
- }
- /*
- * if (locking) { OpenJPAQuery oq =
- * OpenJPAPersistence.cast(q); FetchPlan fetch =
- * oq.getFetchPlan();
- * fetch.setReadLockMode(LockModeType.WRITE);
- * fetch.setLockTimeout(-1); // no limit }
- */
- cjBeans = q.getResultList();
- for (CoordinatorJobBean j : cjBeans) {
- jobList.add(j);
- }
- }
- catch (Exception e) {
- throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
- }
- return jobList;
-
- }
- });
- return cjBeans;
- }
-
- /**
- * Load the CoordinatorAction into a Bean and return it.
- *
- * @param id action ID
- * @return CoordinatorActionBean
- * @throws StoreException
- */
- public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
- ParamChecker.notEmpty(id, "actionID");
- CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
- public CoordinatorActionBean call() throws StoreException {
- Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
- q.setParameter("id", id);
- OpenJPAQuery oq = OpenJPAPersistence.cast(q);
- /*
- * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
- * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
- * fetch.setReadLockMode(LockModeType.WRITE);
- * fetch.setLockTimeout(-1); // no limit }
- */
-
- CoordinatorActionBean action = null;
- List<CoordinatorActionBean> actions = q.getResultList();
- if (actions.size() > 0) {
- action = actions.get(0);
- }
- else {
- throw new StoreException(ErrorCode.E0605, id);
- }
-
- /*
- * if (locking) return action; else
- */
- return getBeanForRunningCoordAction(action);
- }
- });
- return caBean;
- }
-
- /**
- * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
- * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY, NONE)
- *
- * @param id job ID
- * @param numResults number of results to return
- * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY, NONE
- * @return List of CoordinatorActionBean
- * @throws StoreException
- */
- public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
- final String executionOrder) throws StoreException {
- ParamChecker.notEmpty(id, "jobID");
- List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
- new Callable<List<CoordinatorActionBean>>() {
- public List<CoordinatorActionBean> call() throws StoreException {
-
- List<CoordinatorActionBean> caBeans;
- Query q;
- // check if executionOrder is FIFO, LIFO, NONE or LAST_ONLY
- if (executionOrder.equalsIgnoreCase("FIFO")) {
- q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
- }
- else {
- q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
- }
- q.setParameter("jobId", id);
- // if executionOrder is LAST_ONLY, only retrieve first
- // record in LIFO,
- // otherwise, use numResults if it is positive.
- if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
- q.setMaxResults(1);
- }
- else {
- if (numResults > 0) {
- q.setMaxResults(numResults);
- }
- }
- caBeans = q.getResultList();
- return caBeans;
- }
- });
- return caBeans;
- }
-
- /**
- * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
- * concurrency number.
- *
- * @param id job ID
- * @return Number of running actions
- * @throws StoreException
- */
- public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
- ParamChecker.notEmpty(id, "jobID");
- Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
- public Integer call() throws SQLException {
-
- Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
-
- q.setParameter("jobId", id);
- Long count = (Long) q.getSingleResult();
- return Integer.valueOf(count.intValue());
- }
- });
- return cnt.intValue();
- }
-
- /**
- * Create a new Action record in the ACTIONS table with the given Bean.
- *
- * @param action WorkflowActionBean
- * @throws StoreException If the action is already present
- */
- public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
- ParamChecker.notNull(action, "CoordinatorActionBean");
- doOperation("insertCoordinatorAction", new Callable<Void>() {
- public Void call() throws StoreException {
- entityManager.persist(action);
- return null;
- }
- });
- }
-
- /**
- * Update the given action bean to DB.
- *
- * @param action Action Bean
- * @throws StoreException if action doesn't exist
- */
- public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
- ParamChecker.notNull(action, "CoordinatorActionBean");
- doOperation("updateCoordinatorAction", new Callable<Void>() {
- public Void call() throws StoreException, JPAExecutorException {
- CoordActionQueryExecutor.getInstance().executeUpdate(
- CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION, action);
- return null;
- }
- });
- }
-
- /**
- * Update the given action bean to DB.
- *
- * @param action Action Bean
- * @throws StoreException if action doesn't exist
- */
- public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
- ParamChecker.notNull(action, "CoordinatorActionBean");
- doOperation("updateCoordinatorAction", new Callable<Void>() {
- public Void call() throws StoreException, JPAExecutorException {
- CoordActionQueryExecutor.getInstance().executeUpdate(
- CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, action);
- return null;
- }
- });
- }
-
- /**
- * Update the given coordinator job bean to DB.
- *
- * @param jobbean Coordinator Job Bean
- * @throws StoreException if action doesn't exist
- */
- public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
- ParamChecker.notNull(job, "CoordinatorJobBean");
- doOperation("updateJob", new Callable<Void>() {
- public Void call() throws StoreException, JPAExecutorException {
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- return null;
- }
- });
- }
-
- public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
- ParamChecker.notNull(job, "CoordinatorJobBean");
- doOperation("updateJobStatus", new Callable<Void>() {
- public Void call() throws StoreException, JPAExecutorException {
- CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME, job);
- return null;
- }
- });
- }
-
- private <V> V doOperation(String name, Callable<V> command) throws StoreException {
- try {
- Instrumentation.Cron cron = new Instrumentation.Cron();
- cron.start();
- V retVal;
- try {
- retVal = command.call();
- }
- finally {
- cron.stop();
- }
- Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
- return retVal;
- }
- catch (StoreException ex) {
- throw ex;
- }
- catch (SQLException ex) {
- throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
- }
- catch (Exception e) {
- throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
- }
- }
-
- /**
- * Purge the coordinators completed older than given days.
- *
- * @param olderThanDays number of days for which to preserve the coordinators
- * @param limit maximum number of coordinator jobs to be purged
- * @throws StoreException
- */
- public void purge(final long olderThanDays, final int limit) throws StoreException {
- doOperation("coord-purge", new Callable<Void>() {
- public Void call() throws SQLException, StoreException, WorkflowException {
- Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
- Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
- jobQ.setParameter("lastModTime", lastModTm);
- jobQ.setMaxResults(limit);
- List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
-
- int actionDeleted = 0;
- if (coordJobs.size() != 0) {
- for (CoordinatorJobBean coord : coordJobs) {
- String jobId = coord.getId();
- entityManager.remove(coord);
- Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
- g.setParameter("jobId", jobId);
- actionDeleted += g.executeUpdate();
- }
- }
-
- XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
- return null;
- }
- });
- }
-
- public void commit() throws StoreException {
- }
-
- public void close() throws StoreException {
- }
-
- public CoordinatorJobBean getCoordinatorJobs(String id) {
- // TODO Auto-generated method stub
- return null;
- }
-
- public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
- throws StoreException {
-
- CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
- public CoordinatorJobInfo call() throws SQLException, StoreException {
- List<String> orArray = new ArrayList<String>();
- List<String> colArray = new ArrayList<String>();
- List<String> valArray = new ArrayList<String>();
- StringBuilder sb = new StringBuilder("");
-
- StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
- StoreStatusFilter.coordCountStr);
-
- int realLen = 0;
-
- Query q = null;
- Query qTotal = null;
- if (orArray.size() == 0) {
- q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
- q.setFirstResult(start - 1);
- q.setMaxResults(len);
- qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
- }
- else {
- StringBuilder sbTotal = new StringBuilder(sb);
- sb.append(" order by w.createdTimestamp desc ");
- XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
- q = entityManager.createQuery(sb.toString());
- q.setFirstResult(start - 1);
- q.setMaxResults(len);
- qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
- StoreStatusFilter.coordCountStr));
- }
-
- for (int i = 0; i < orArray.size(); i++) {
- q.setParameter(colArray.get(i), valArray.get(i));
- qTotal.setParameter(colArray.get(i), valArray.get(i));
- }
-
- OpenJPAQuery kq = OpenJPAPersistence.cast(q);
- JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
- fetch.setFetchBatchSize(20);
- fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
- fetch.setFetchDirection(FetchDirection.FORWARD);
- fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
- List<?> resultList = q.getResultList();
- List<Object[]> objectArrList = (List<Object[]>) resultList;
- List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
-
- for (Object[] arr : objectArrList) {
- CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
- coordBeansList.add(ww);
- }
-
- realLen = ((Long) qTotal.getSingleResult()).intValue();
-
- return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
- }
- });
- return coordJobInfo;
- }
-
- private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
- CoordinatorJobBean bean = new CoordinatorJobBean();
- bean.setId((String) arr[0]);
- if (arr[1] != null) {
- bean.setAppName((String) arr[1]);
- }
- if (arr[2] != null) {
- bean.setStatus(Status.valueOf((String) arr[2]));
- }
- if (arr[3] != null) {
- bean.setUser((String) arr[3]);
- }
- if (arr[4] != null) {
- bean.setGroup((String) arr[4]);
- }
- if (arr[5] != null) {
- bean.setStartTime((Timestamp) arr[5]);
- }
- if (arr[6] != null) {
- bean.setEndTime((Timestamp) arr[6]);
- }
- if (arr[7] != null) {
- bean.setAppPath((String) arr[7]);
- }
- if (arr[8] != null) {
- bean.setConcurrency(((Integer) arr[8]).intValue());
- }
- if (arr[9] != null) {
- bean.setFrequency((String) arr[9]);
- }
- if (arr[10] != null) {
- bean.setLastActionTime((Timestamp) arr[10]);
- }
- if (arr[11] != null) {
- bean.setNextMaterializedTime((Timestamp) arr[11]);
- }
- if (arr[13] != null) {
- bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
- }
- if (arr[14] != null) {
- bean.setTimeZone((String) arr[14]);
- }
- if (arr[15] != null) {
- bean.setTimeout((Integer) arr[15]);
- }
- return bean;
- }
-
- /**
- * Loads all actions for the given Coordinator job.
- *
- * @param jobId coordinator job id
- * @param locking true if Actions are to be locked
- * @return A List of CoordinatorActionBean
- * @throws StoreException
- */
- public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking)
- throws StoreException {
- ParamChecker.notEmpty(jobId, "CoordinatorJobID");
- Integer actionsCount = doOperation("getActionsForCoordinatorJob",
- new Callable<Integer>() {
- @SuppressWarnings("unchecked")
- public Integer call() throws StoreException {
- List<CoordinatorActionBean> actions;
- List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
- try {
- Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
- q.setParameter("jobId", jobId);
- /*
- * if (locking) { //
- * q.setHint("openjpa.FetchPlan.ReadLockMode", //
- * "READ"); OpenJPAQuery oq =
- * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
- * (JDBCFetchPlan) oq.getFetchPlan();
- * fetch.setReadLockMode(LockModeType.WRITE);
- * fetch.setLockTimeout(-1); // 1 second }
- */
- Long count = (Long) q.getSingleResult();
- return Integer.valueOf(count.intValue());
- /*actions = q.getResultList();
- for (CoordinatorActionBean a : actions) {
- CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
- actionList.add(aa);
- }*/
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- /*
- * if (locking) { return actions; } else {
- */
-
- // }
- }
- });
- return actionsCount;
- }
-
- /**
- * Loads given number of actions for the given Coordinator job.
- *
- * @param jobId coordinator job id
- * @param start offset for select statement
- * @param len number of Workflow Actions to be returned
- * @return A List of CoordinatorActionBean
- * @throws StoreException
- */
- public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
- final int len) throws StoreException {
- ParamChecker.notEmpty(jobId, "CoordinatorJobID");
- List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
- new Callable<List<CoordinatorActionBean>>() {
- @SuppressWarnings("unchecked")
- public List<CoordinatorActionBean> call() throws StoreException {
- List<CoordinatorActionBean> actions;
- List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
- try {
- Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
- q.setParameter("jobId", jobId);
- q.setFirstResult(start - 1);
- q.setMaxResults(len);
- actions = q.getResultList();
- for (CoordinatorActionBean a : actions) {
- CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
- actionList.add(aa);
- }
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- return actionList;
- }
- });
- return actions;
- }
-
- protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
- if (a != null) {
- CoordinatorActionBean action = new CoordinatorActionBean();
- action.setId(a.getId());
- action.setActionNumber(a.getActionNumber());
- action.setActionXmlBlob(a.getActionXmlBlob());
- action.setConsoleUrl(a.getConsoleUrl());
- action.setCreatedConfBlob(a.getCreatedConfBlob());
- action.setErrorCode(a.getErrorCode());
- action.setErrorMessage(a.getErrorMessage());
- action.setExternalStatus(a.getExternalStatus());
- action.setMissingDependenciesBlob(a.getMissingDependenciesBlob());
- action.setRunConfBlob(a.getRunConfBlob());
- action.setTimeOut(a.getTimeOut());
- action.setTrackerUri(a.getTrackerUri());
- action.setType(a.getType());
- action.setCreatedTime(a.getCreatedTime());
- action.setExternalId(a.getExternalId());
- action.setJobId(a.getJobId());
- action.setLastModifiedTime(a.getLastModifiedTime());
- action.setNominalTime(a.getNominalTime());
- action.setSlaXmlBlob(a.getSlaXmlBlob());
- action.setStatus(a.getStatus());
- return action;
- }
- return null;
- }
-
- public CoordinatorActionBean getAction(String id, boolean b) {
- return null;
- }
-
-
- public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
- throws StoreException {
- ParamChecker.notEmpty(jobId, "CoordinatorJobID");
- List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
- new Callable<List<CoordinatorActionBean>>() {
- @SuppressWarnings("unchecked")
- public List<CoordinatorActionBean> call() throws StoreException {
- List<CoordinatorActionBean> actions;
- try {
- Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
- q.setParameter("jobId", jobId);
- /*
- * if (locking) {
- * q.setHint("openjpa.FetchPlan.ReadLockMode",
- * "READ"); OpenJPAQuery oq =
- * OpenJPAPersistence.cast(q); FetchPlan fetch =
- * oq.getFetchPlan();
- * fetch.setReadLockMode(LockModeType.WRITE);
- * fetch.setLockTimeout(-1); // no limit }
- */
- actions = q.getResultList();
- return actions;
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- }
- });
- return actions;
- }
-
- public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
- throws StoreException {
- List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
- new Callable<List<CoordinatorActionBean>>() {
- @SuppressWarnings("unchecked")
- public List<CoordinatorActionBean> call() throws StoreException {
- List<CoordinatorActionBean> actions;
- Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
- try {
- Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
- q.setParameter("lastModifiedTime", ts);
- /*
- * if (locking) { OpenJPAQuery oq =
- * OpenJPAPersistence.cast(q); FetchPlan fetch =
- * oq.getFetchPlan();
- * fetch.setReadLockMode(LockModeType.WRITE);
- * fetch.setLockTimeout(-1); // no limit }
- */
- actions = q.getResultList();
- return actions;
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- }
- });
- return actions;
- }
-
- public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
- throws StoreException {
- List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
- new Callable<List<CoordinatorActionBean>>() {
- @SuppressWarnings("unchecked")
- public List<CoordinatorActionBean> call() throws StoreException {
- List<CoordinatorActionBean> actions;
- try {
- Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
- Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
- q.setParameter("lastModifiedTime", ts);
- /*
- * if (locking) { OpenJPAQuery oq =
- * OpenJPAPersistence.cast(q); FetchPlan fetch =
- * oq.getFetchPlan();
- * fetch.setReadLockMode(LockModeType.WRITE);
- * fetch.setLockTimeout(-1); // no limit }
- */
- actions = q.getResultList();
- return actions;
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- }
- });
- return actions;
- }
-
- /**
- * Get coordinator action beans for given start date and end date
- *
- * @param startDate
- * @param endDate
- * @return list of coordinator action beans
- * @throws StoreException
- */
- public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
- final Date endDate) throws StoreException {
- List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
- new Callable<List<CoordinatorActionBean>>() {
- @SuppressWarnings("unchecked")
- public List<CoordinatorActionBean> call() throws StoreException {
- List<CoordinatorActionBean> actions;
- try {
- Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
- q.setParameter("jobId", jobId);
- q.setParameter("startTime", new Timestamp(startDate.getTime()));
- q.setParameter("endTime", new Timestamp(endDate.getTime()));
- actions = q.getResultList();
- List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
- for (CoordinatorActionBean a : actions) {
- CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
- actionList.add(aa);
- }
- return actionList;
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- }
- });
- return actions;
- }
-
- /**
- * Get coordinator action bean for given date
- *
- * @param nominalTime
- * @return CoordinatorActionBean
- * @throws StoreException
- */
- public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
- throws StoreException {
- CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
- new Callable<CoordinatorActionBean>() {
- @SuppressWarnings("unchecked")
- public CoordinatorActionBean call() throws StoreException {
- List<CoordinatorActionBean> actions;
- Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
- q.setParameter("jobId", jobId);
- q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
- actions = q.getResultList();
-
- CoordinatorActionBean action = null;
- if (actions.size() > 0) {
- action = actions.get(0);
- }
- else {
- throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime));
- }
- return getBeanForRunningCoordAction(action);
- }
- });
- return action;
- }
-
- public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
- List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
- @SuppressWarnings("unchecked")
- public List<String> call() throws StoreException {
- List<String> jobids = new ArrayList<String>();
- try {
- Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
- Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
- q.setParameter("lastModifiedTime", ts);
- List<Object[]> list = q.getResultList();
-
- for (Object[] arr : list) {
- if (arr != null && arr[0] != null) {
- jobids.add((String) arr[0]);
- }
- }
-
- return jobids;
- }
- catch (IllegalStateException e) {
- throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
- }
- }
- });
- return jobids;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 1048adc..8d59182 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -111,7 +111,6 @@
org.apache.oozie.service.LiteWorkflowAppService,
org.apache.oozie.service.JPAService,
org.apache.oozie.service.StoreService,
- org.apache.oozie.service.CoordinatorStoreService,
org.apache.oozie.service.SLAStoreService,
org.apache.oozie.service.DBLiteWorkflowStoreService,
org.apache.oozie.service.CallbackService,
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
index 8b5bee0..c6c9c49 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.StoreService;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XConfiguration;
@@ -389,11 +389,11 @@ public class TestCoordinatorEngine extends XTestCase {
* @throws StoreException
*/
private void checkCoordJob(String jobId) throws StoreException {
- CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
try {
- store.getCoordinatorJob(jobId, false);
+ CoordinatorJobBean job = CoordJobQueryExecutor.getInstance()
+ .get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, jobId);
}
- catch (StoreException se) {
+ catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + jobId + " was not stored properly in db");
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsIgnoreXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsIgnoreXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsIgnoreXCommand.java
index 2ff89b2..21c0c37 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsIgnoreXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsIgnoreXCommand.java
@@ -20,54 +20,22 @@ package org.apache.oozie.command.coord;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorActionInfo;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.CoordinatorJob.Execution;
-import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
-import org.apache.oozie.coord.CoordELFunctions;
-import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.LiteWorkflowStoreService;
-import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.StatusTransitService;
-import org.apache.oozie.service.StoreService;
-import org.apache.oozie.service.WorkflowStoreService;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.workflow.WorkflowApp;
-import org.apache.oozie.workflow.WorkflowInstance;
-import org.apache.oozie.workflow.WorkflowLib;
-import org.apache.oozie.workflow.lite.EndNodeDef;
-import org.apache.oozie.workflow.lite.LiteWorkflowApp;
-import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
-import org.apache.oozie.workflow.lite.StartNodeDef;
-import org.jdom.Element;
-import org.jdom.JDOMException;
public class TestCoordActionsIgnoreXCommand extends XDataTestCase {
private Services services;
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index 5d779b4..7c154c8 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -830,7 +830,6 @@ public class TestCoordChangeXCommand extends XDataTestCase {
}
private void addRecordToJobTable(String jobId) throws Exception {
- // CoordinatorStore store = new CoordinatorStore(false);
CoordinatorJobBean coordJob = new CoordinatorJobBean();
coordJob.setId(jobId);
coordJob.setAppName("testApp");
http://git-wip-us.apache.org/repos/asf/oozie/blob/6ea1ed83/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
index f9a276f..552d832 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
@@ -25,7 +25,6 @@ import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.service.Services;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
@@ -64,14 +63,13 @@ public class TestCoordELExtensions extends XDataTestCase {
}
protected CoordinatorActionBean checkCoordAction(String actionId) throws StoreException {
- CoordinatorStore store = new CoordinatorStore(false);
try {
- CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
+ CoordinatorActionBean action = getCoordinatorAction(actionId);
assertEquals(
"file://#testDir/2009/03/06/00/_SUCCESS#file://#testDir/2009/03/05/23/_SUCCESS",
action.getMissingDependencies());
return action;
- } catch (StoreException se) {
+ } catch (Exception se) {
se.printStackTrace();
fail("Action ID " + actionId + " was not stored properly in db");
}