You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/04/14 16:01:32 UTC
[2/5] falcon git commit: FALCON-1151 Migrate oozie related methods
from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index 3f6dc66..a5a89d7 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -129,13 +129,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3, bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
@@ -159,11 +160,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
waitingForBundleFinish(cluster3, oldBundleId, 5);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
}
@@ -182,12 +183,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes =
- OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+ OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
@@ -201,7 +202,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
bundles[1].setProcessValidity(newStartTime, newEndTime);
bundles[1].setProcessConcurrency(10);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
LOGGER.info("updated process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
while (Util.parseResponse(
@@ -213,7 +214,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(10);
}
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
dualComparison(prism, cluster3, bundles[1].getProcessData());
@@ -222,7 +223,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances =
- InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS).size();
Assert.assertEquals(finalNumberOfInstances,
getExpectedNumberOfWorkflowInstances(TimeUtil
@@ -240,7 +241,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getEnd()));
- Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+ Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId),
expectedNumberOfWorkflows);
}
@@ -252,9 +253,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(25);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -285,7 +286,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil
.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
while (Util.parseResponse(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()))
@@ -302,7 +303,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
waitingForBundleFinish(cluster3, oldBundleId);
int finalNumberOfInstances =
- InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS).size();
int expectedInstances =
@@ -328,13 +329,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
@@ -347,7 +348,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
ServiceResponse response =
prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
AssertUtil.assertSucceeded(response);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
@@ -366,15 +367,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String originalProcessData = bundles[1].getProcessData();
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
TimeUtil.sleepSeconds(20);
List<String> oldNominalTimes =
- OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+ OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
bundles[1].setProcessName(this.getClass().getSimpleName() + "-myNewProcessName");
//now to update
@@ -382,7 +383,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
prism.getProcessHelper()
.update((bundles[1].getProcessData()), bundles[1].getProcessData());
AssertUtil.assertFailed(response);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
originalProcessData, false, false);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -400,18 +401,18 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
//now to update
DateTime updateTime = new DateTime(DateTimeZone.UTC);
TimeUtil.sleepSeconds(60);
List<String> oldNominalTimes =
- OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+ OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
LOGGER.info("updating at " + updateTime);
while (Util
.parseResponse(updateProcessConcurrency(bundles[1],
@@ -428,7 +429,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated
// correctly.
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(),
false, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -443,7 +444,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
&&
status != Job.Status.DONEWITHERROR) {
int statusCount = InstanceUtil
- .getInstanceCountWithStatus(cluster3,
+ .getInstanceCountWithStatus(cluster3OC,
bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS);
@@ -468,7 +469,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd()));
- Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+ Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId),
expectedNumberOfInstances);
}
@@ -484,11 +485,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -500,7 +501,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getStart()),
newEndTime);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
ServiceResponse response = prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData());
@@ -514,12 +515,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
Assert.assertEquals(Util.parseResponse(response).getStatus(),
APIResult.Status.SUCCEEDED, "Process update did not succeed.");
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, true);
int i = 0;
- while (OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId)
+ while (OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId)
!= getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
@@ -540,7 +541,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
// one is updated
// correctly.
int finalNumberOfInstances = InstanceUtil
- .getProcessInstanceList(cluster3,
+ .getProcessInstanceList(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS)
.size();
Assert.assertEquals(finalNumberOfInstances,
@@ -567,15 +568,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
AssertUtil.assertSucceeded(
@@ -595,7 +596,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
@@ -610,7 +611,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
&&
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
- .getInstanceCountWithStatus(cluster3,
+ .getInstanceCountWithStatus(cluster3OC,
bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
@@ -627,7 +628,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
waitingForBundleFinish(cluster3, oldBundleId);
int finalNumberOfInstances =
- InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS).size();
int expectedInstances =
@@ -660,11 +661,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
//now to update
@@ -707,7 +708,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
&&
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
- .getInstanceCountWithStatus(cluster3,
+ .getInstanceCountWithStatus(cluster3OC,
bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
@@ -721,9 +722,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
- OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS),
+ OozieUtil.verifyNewBundleCreation(cluster3OC, OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS),
oldNominalTimes, bundles[1].getProcessData(), false,
true
);
@@ -731,7 +732,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
waitingForBundleFinish(cluster3, oldBundleId);
int finalNumberOfInstances =
- InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS).size();
int expectedInstances =
@@ -761,11 +762,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -793,11 +794,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
int finalNumberOfInstances =
- InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS).size();
int expectedInstances =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
@@ -826,13 +827,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -864,11 +865,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
int finalNumberOfInstances =
- InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS).size();
int expectedInstances =
@@ -897,13 +898,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(20);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
@@ -921,7 +922,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getStatus() != APIResult.Status.SUCCEEDED) {
TimeUtil.sleepSeconds(20);
}
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
@@ -931,7 +932,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -947,14 +948,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
- InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
@@ -974,7 +975,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(10);
}
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
@@ -986,7 +987,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -1012,12 +1013,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- Util.readEntityName(originalProcess), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ Util.readEntityName(originalProcess), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
//submit new feed
@@ -1037,12 +1038,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, false);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
Job.Status.RUNNING);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
while (Util.parseResponse(
prism.getProcessHelper().update(updatedProcess, updatedProcess)).getStatus()
@@ -1053,13 +1054,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
dualComparison(prism, cluster3, bundles[1].getProcessData());
Assert.assertTrue(Util.isDefinitionSame(cluster2, prism, originalProcess));
bundles[1].verifyDependencyListing(cluster2);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
updatedProcess, true, false);
waitingForBundleFinish(cluster3, oldBundleId);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
Job.Status.RUNNING);
@@ -1074,13 +1075,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1098,7 +1099,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("update didnt SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
}
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, true);
bundles[1].verifyDependencyListing(cluster2);
@@ -1108,7 +1109,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances = InstanceUtil
- .getProcessInstanceList(cluster3,
+ .getProcessInstanceList(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS)
.size();
Assert.assertEquals(finalNumberOfInstances,
@@ -1123,7 +1124,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
newEndTime);
- Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+ Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId),
expectedNumberOfWorkflows);
}
@@ -1137,12 +1138,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
@@ -1171,7 +1172,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances = InstanceUtil
- .getProcessInstanceList(cluster3,
+ .getProcessInstanceList(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS)
.size();
Assert.assertEquals(finalNumberOfInstances,
@@ -1204,13 +1205,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes =
- OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+ OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
@@ -1232,7 +1233,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated
// correctly.
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -1255,12 +1256,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
@@ -1281,7 +1282,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -1296,11 +1297,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId,
EntityType.PROCESS);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1312,13 +1313,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getEnd()
));
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
AssertUtil.assertSucceeded(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
@@ -1331,12 +1332,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
- OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
+ OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId);
String oldStartTime = TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
@@ -1350,7 +1351,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getEnd()
));
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
@@ -1367,14 +1368,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances =
- InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, bundles[1].getProcessName(),
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(),
EntityType.PROCESS).size();
Assert.assertEquals(finalNumberOfInstances,
getExpectedNumberOfWorkflowInstances(oldStartTime,
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getEnd()));
Assert.assertEquals(InstanceUtil
- .getProcessInstanceList(cluster3,
+ .getProcessInstanceList(cluster3OC,
bundles[1].getProcessName(), EntityType.PROCESS)
.size(), getExpectedNumberOfWorkflowInstances(newStartTime,
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
@@ -1389,9 +1390,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
- String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- bundles[1].getProcessName(), EntityType.PROCESS);
+ String oldBundleId = OozieUtil
+ .getLatestBundleID(cluster3OC,
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1404,7 +1405,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
));
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
- waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
@@ -1413,9 +1414,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
List<String> oldNominalTimes =
- OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+ OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS);
- OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
bundles[1].verifyDependencyListing(cluster2);
@@ -1448,11 +1449,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
EntityType.PROCESS);
//save old data
- String oldBundleID = InstanceUtil
- .getLatestBundleID(cluster1,
- Util.readEntityName(b.getProcessData()), EntityType.PROCESS);
+ String oldBundleID = OozieUtil
+ .getLatestBundleID(cluster1OC,
+ Util.readEntityName(b.getProcessData()), EntityType.PROCESS);
- List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1,
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1OC,
oldBundleID,
EntityType.PROCESS);
@@ -1467,7 +1468,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
TimeUtil.sleepSeconds(20);
//verify new bundle creation
- OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
+ OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, oldNominalTimes,
b.getProcessData(), true, true);
} finally {
@@ -1529,26 +1530,26 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
}
- private void waitForProcessToReachACertainState(ColoHelper coloHelper, Bundle bundle,
+ private void waitForProcessToReachACertainState(OozieClient oozieClient, Bundle bundle,
Job.Status state)
throws Exception {
- while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
+ while (OozieUtil.getOozieJobStatus(oozieClient,
bundle.getProcessName(), EntityType.PROCESS) != state) {
//keep waiting
TimeUtil.sleepSeconds(10);
}
//now check if the coordinator is in desired state
- CoordinatorJob coord = getDefaultOozieCoord(coloHelper, InstanceUtil
- .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
- EntityType.PROCESS));
+ CoordinatorJob coord = getDefaultOozieCoord(oozieClient, OozieUtil
+ .getLatestBundleID(oozieClient, bundle.getProcessName(),
+ EntityType.PROCESS));
while (coord.getStatus() != state) {
TimeUtil.sleepSeconds(10);
- coord = getDefaultOozieCoord(coloHelper, InstanceUtil
- .getLatestBundleID(coloHelper, bundle.getProcessName(),
- EntityType.PROCESS));
+ coord = getDefaultOozieCoord(oozieClient, OozieUtil
+ .getLatestBundleID(oozieClient, bundle.getProcessName(),
+ EntityType.PROCESS));
}
}
@@ -1630,14 +1631,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
waitingForBundleFinish(coloHelper, bundleId, 15);
}
- private CoordinatorJob getDefaultOozieCoord(ColoHelper coloHelper, String bundleId)
- throws Exception {
- OozieClient client = coloHelper.getFeedHelper().getOozieClient();
- BundleJob bundlejob = client.getBundleJobInfo(bundleId);
-
+ private CoordinatorJob getDefaultOozieCoord(OozieClient oozieClient, String bundleId) throws Exception {
+ BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId);
for (CoordinatorJob coord : bundlejob.getCoordinators()) {
if (coord.getAppName().contains("DEFAULT")) {
- return client.getCoordJobInfo(coord.getId());
+ return oozieClient.getCoordJobInfo(coord.getId());
}
}
return null;
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
index 39f0268..944c67f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
@@ -284,7 +284,7 @@ public class OptionalInputTest extends BaseTestClass {
prism.getProcessHelper().update(process.toString(), process.toString());
//from now on ... it should wait of input0 also
- InstanceUtil.waitTillInstancesAreCreated(cluster, process.toString(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(serverOC.get(0), process.toString(), 0);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS, 10);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
index 4221525..1bc4027 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
@@ -28,11 +28,13 @@ import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
+import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -51,6 +53,7 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
private ColoHelper cluster1 = servers.get(0);
private ColoHelper cluster2 = servers.get(1);
private ColoHelper cluster3 = servers.get(2);
+ private OozieClient cluster1OC = serverOC.get(0);
private FileSystem cluster1FS = serverFS.get(0);
private FileSystem cluster2FS = serverFS.get(1);
private FileSystem cluster3FS = serverFS.get(2);
@@ -124,23 +127,22 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
.withPartition("UK/${cluster.colo}")
.build()).toString();
-
LOGGER.info("feed: " + Util.prettyPrintXml(feed));
prism.getFeedHelper().submitAndSchedule(feed);
TimeUtil.sleepSeconds(10);
String bundleId =
- InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+ OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
//wait till 1st instance of replication coord is SUCCEEDED
- List<String> replicationCoordIDTarget = InstanceUtil
+ List<String> replicationCoordIDTarget = OozieUtil
.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
for (int i = 0; i < 30; i++) {
- if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
0) == WorkflowJob.Status.SUCCEEDED
- && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0)
== WorkflowJob.Status.SUCCEEDED) {
break;
@@ -151,10 +153,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
TimeUtil.sleepSeconds(15);
List<String> inputFolderListForColo1 =
- InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
replicationCoordIDTarget.get(0), 1);
List<String> inputFolderListForColo2 =
- InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
replicationCoordIDTarget.get(1), 1);
HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
@@ -216,16 +218,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
TimeUtil.sleepSeconds(60);
//wait till 1st instance of replication coord is SUCCEEDED
- String bundleId = InstanceUtil
- .getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
-
- List<String> replicationCoordIDTarget = InstanceUtil.getReplicationCoordID(bundleId,
+ String bundleId = OozieUtil
+ .getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
+ List<String> replicationCoordIDTarget = OozieUtil.getReplicationCoordID(bundleId,
cluster1.getFeedHelper());
for (int i = 0; i < 30; i++) {
- if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
0) == WorkflowJob.Status.SUCCEEDED
- && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0)
== WorkflowJob.Status.SUCCEEDED) {
break;
@@ -234,19 +235,19 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
TimeUtil.sleepSeconds(20);
}
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(0), 0),
WorkflowJob.Status.SUCCEEDED);
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0),
WorkflowJob.Status.SUCCEEDED);
TimeUtil.sleepSeconds(15);
List<String> inputFolderListForColo1 = InstanceUtil
- .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1);
+ .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1);
List<String> inputFolderListForColo2 = InstanceUtil
- .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1);
+ .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1);
HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
inputFolderListForColo1);
@@ -258,17 +259,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
//check for run id to be 1
Assert.assertEquals(
- InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0),
1, "id has to be equal 1");
Assert.assertEquals(
- InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0),
1, "id has to be equal 1");
//wait for lates run to complete
for (int i = 0; i < 30; i++) {
- if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
0) == WorkflowJob.Status.SUCCEEDED
- && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0)
== WorkflowJob.Status.SUCCEEDED) {
break;
@@ -276,10 +277,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
LOGGER.info("still in for loop");
TimeUtil.sleepSeconds(20);
}
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(0), 0),
WorkflowJob.Status.SUCCEEDED);
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0),
WorkflowJob.Status.SUCCEEDED);
@@ -296,10 +297,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
//check for run id to be 2
Assert.assertEquals(
- InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0),
2, "id has to be equal 2");
Assert.assertEquals(
- InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0),
2, "id has to be equal 2");
}
@@ -381,15 +382,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
//wait till 1st instance of replication coord is SUCCEEDED
String bundleId =
- InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+ OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
List<String> replicationCoordIDTarget =
- InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+ OozieUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
for (int i = 0; i < 30; i++) {
- if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
0) == WorkflowJob.Status.SUCCEEDED
- && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0)
== WorkflowJob.Status.SUCCEEDED) {
break;
@@ -398,10 +399,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
TimeUtil.sleepSeconds(20);
}
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
"Replication job should have succeeded.");
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
"Replication job should have succeeded.");
@@ -411,14 +412,14 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
// be present. both of them should have _success
List<String> inputFolderListForColo1 = InstanceUtil
- .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1);
+ .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1);
List<String> inputFolderListForColo2 = InstanceUtil
- .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1);
+ .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1);
String outPutLocation = InstanceUtil
- .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 0);
+ .getOutputFolderForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 0);
String outPutBaseLocation = InstanceUtil
- .getOutputFolderBaseForInstanceForReplication(cluster1,
+ .getOutputFolderBaseForInstanceForReplication(cluster1OC,
replicationCoordIDTarget.get(0), 0);
List<String> subFolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
@@ -439,17 +440,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
//check for run id to be 1
Assert.assertTrue(
- InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0)
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0)
== 1
- && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+ && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1),
0) == 1,
"id have to be equal 1");
//wait for latest run to complete
for (int i = 0; i < 30; i++) {
- if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
0) == WorkflowJob.Status.SUCCEEDED
- && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) {
break;
}
@@ -467,9 +468,9 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9));
//check for run id to be 2
Assert.assertTrue(
- InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0)
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0)
== 2
- && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+ && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1),
0) == 2,
"id have to be equal 2");
}
@@ -571,15 +572,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
//wait till 1st instance of replication coord is SUCCEEDED
String bundleId =
- InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+ OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED);
List<String> replicationCoordIDTarget =
- InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+ OozieUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
for (int i = 0; i < 30; i++) {
- if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0),
0) == WorkflowJob.Status.SUCCEEDED
- && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) {
break;
}
@@ -588,10 +589,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
TimeUtil.sleepSeconds(20);
}
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
"Replication job did not succeed");
- Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC,
replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
"Replication job did not succeed");
@@ -601,17 +602,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
be present. both of
them should have _success */
List<String> inputFolderListForColo1 =
- InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
replicationCoordIDTarget.get(0), 1);
List<String> inputFolderListForColo2 =
- InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC,
replicationCoordIDTarget.get(1), 1);
String outPutLocation = InstanceUtil
- .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+ .getOutputFolderForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0),
0);
String outPutBaseLocation = InstanceUtil
- .getOutputFolderBaseForInstanceForReplication(cluster1,
+ .getOutputFolderBaseForInstanceForReplication(cluster1OC,
replicationCoordIDTarget.get(0), 0);
List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
@@ -634,9 +635,9 @@ public class PrismFeedLateReplicationTest extends BaseTestClass {
//check for run id to be 1
Assert.assertTrue(
- InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0)
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0)
== 1
- && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+ && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1),
0) == 1,
"id have to be equal 1");
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
index c6f72cc..43aafdf 100755
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
@@ -23,13 +23,13 @@ import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -54,14 +54,12 @@ import java.util.List;
@Test(groups = "distributed")
public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
- private ColoHelper cluster1 = servers.get(0);
- private ColoHelper cluster2 = servers.get(1);
- private ColoHelper cluster3 = servers.get(2);
private FileSystem cluster1FS = serverFS.get(0);
private FileSystem cluster2FS = serverFS.get(1);
private FileSystem cluster3FS = serverFS.get(2);
private OozieClient cluster1OC = serverOC.get(0);
private OozieClient cluster2OC = serverOC.get(1);
+ private OozieClient cluster3OC = serverOC.get(2);
private String testDate = "/2012/10/01/12/";
private String baseTestDir = cleanAndGetTestDir();
private String testBaseDir1 = baseTestDir + "/localDC/rc/billing";
@@ -83,7 +81,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
// pt : partition in target
// ps: partition in source
-
private void uploadDataToServer3(String location, String fileName) throws IOException {
HadoopUtil.recreateDir(cluster3FS, location);
HadoopUtil.copyDataToFolder(cluster3FS, location, fileName);
@@ -96,9 +93,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
-
LOGGER.info("creating test data");
-
uploadDataToServer3(testDirWithDate + "00/ua2/", testFile1);
uploadDataToServer3(testDirWithDate + "05/ua2/", testFile2);
uploadDataToServer3(testDirWithDate + "10/ua2/", testFile3);
@@ -123,23 +118,19 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
uploadDataToServer3(testBaseDir3 + testDate + "15/ua2/", testFile4);
uploadDataToServer3(testBaseDir3 + testDate + "20/ua2/", testFile4);
-
uploadDataToServer3(testBaseDir3 + testDate + "00/ua1/", testFile1);
uploadDataToServer3(testBaseDir3 + testDate + "05/ua1/", testFile2);
uploadDataToServer3(testBaseDir3 + testDate + "10/ua1/", testFile3);
uploadDataToServer3(testBaseDir3 + testDate + "15/ua1/", testFile4);
uploadDataToServer3(testBaseDir3 + testDate + "20/ua1/", testFile4);
-
uploadDataToServer3(testBaseDir3 + testDate + "00/ua3/", testFile1);
uploadDataToServer3(testBaseDir3 + testDate + "05/ua3/", testFile2);
uploadDataToServer3(testBaseDir3 + testDate + "10/ua3/", testFile3);
uploadDataToServer3(testBaseDir3 + testDate + "15/ua3/", testFile4);
uploadDataToServer3(testBaseDir3 + testDate + "20/ua3/", testFile4);
-
//data for test normalTest_1s2t_pst where both source target partition are required
-
uploadDataToServer3(testDirWithDateSourceTarget + "00/ua3/ua2/", testFile1);
uploadDataToServer3(testDirWithDateSourceTarget + "05/ua3/ua2/", testFile2);
uploadDataToServer3(testDirWithDateSourceTarget + "10/ua3/ua2/", testFile3);
@@ -156,22 +147,17 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
uploadDataToServer1(testDirWithDateSource1 + "00/ua2/", testFile1);
uploadDataToServer1(testDirWithDateSource1 + "05/ua2/", testFile2);
-
uploadDataToServer1(testDirWithDateSource1 + "00/ua1/", testFile1);
uploadDataToServer1(testDirWithDateSource1 + "05/ua1/", testFile2);
-
uploadDataToServer1(testDirWithDateSource1 + "00/ua3/", testFile1);
uploadDataToServer1(testDirWithDateSource1 + "05/ua3/", testFile2);
-
LOGGER.info("completed creating test data");
-
}
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
Bundle bundle = BundleUtil.readFeedReplicationBundle();
-
for (int i = 0; i < 3; i++) {
bundles[i] = new Bundle(bundle, servers.get(i));
bundles[i].generateUniqueBundle(this);
@@ -194,9 +180,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
// replication takes
// place normally
//partition is left blank
-
Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
-
String startTimeUA1 = "2012-10-01T12:05Z";
String startTimeUA2 = "2012-10-01T12:10Z";
@@ -234,8 +218,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
- AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source "
- + "is blank");
+ AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source is blank");
}
@@ -290,42 +273,26 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
r = prism.getFeedHelper().schedule(feed.toString());
AssertUtil.assertSucceeded(r);
TimeUtil.sleepSeconds(15);
-
HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "00/ua3/");
HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "05/ua3/");
- HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "00/ua3/",
- testFile1);
- HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/",
- testFile2);
+ HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "00/ua3/", testFile1);
+ HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/", testFile2);
InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
- Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
- "REPLICATION"), 1);
- Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
- "RETENTION"), 1);
- Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
- "RETENTION"), 1);
- Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
- "RETENTION"), 1);
-
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "RETENTION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "RETENTION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "RETENTION"), 1);
//check if data has been replicated correctly
-
//on ua1 only ua1 should be replicated, ua2 only ua2
//number of files should be same as source
-
-
List<Path> ua2ReplicatedData = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2));
AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua2");
-
List<Path> ua3ReplicatedData00 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua3/"));
List<Path> ua3ReplicatedData05 = HadoopUtil
@@ -388,31 +355,18 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
- "REPLICATION"), 1);
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
- "RETENTION"), 1);
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
- "RETENTION"), 1);
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
- "RETENTION"), 1);
-
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "RETENTION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "RETENTION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "RETENTION"), 1);
//check if data has been replicated correctly
-
//on ua1 only ua1 should be replicated, ua2 only ua2
//number of files should be same as source
-
-
List<Path> ua2ReplicatedData =
HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2));
AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3");
-
List<Path> ua3ReplicatedData00 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua2/"));
List<Path> ua3ReplicatedData05 = HadoopUtil
@@ -439,7 +393,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
// (00 to 30)
//data should be replicated to folder on cluster1 and cluster2 as targets
//ua3 is the source and ua1 and ua2 are target
-
Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
String startTimeUA1 = "2012-10-01T12:05Z";
String startTimeUA2 = "2012-10-01T12:10Z";
@@ -493,19 +446,16 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
//on ua1 only ua1 should be replicated, ua2 only ua2
//number of files should be same as source
-
-
List<Path> ua1ReplicatedData = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate));
+
//check for no ua2 or ua3 in ua1
AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2", "ua3");
List<Path> ua2ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster2FS,
- new Path(testBaseDir3 + testDate));
+ .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate));
AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3");
-
List<Path> ua1ReplicatedData00 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate + "00/"));
List<Path> ua1ReplicatedData10 = HadoopUtil
@@ -539,7 +489,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
//cluster2 is the target
// Since there is no partition expression in source clusters, the feed submission should
// fail (FALCON-305).
-
Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
String startTimeUA1 = "2012-10-01T12:05Z";
@@ -650,18 +599,15 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
//on ua1 only ua1 should be replicated, ua2 only ua2
//number of files should be same as source
-
-
List<Path> ua1ReplicatedData = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate));
+
//check for no ua2 or ua3 in ua1
AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2");
-
List<Path> ua2ReplicatedData = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate));
AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1");
-
List<Path> ua1ReplicatedData05 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS,
new Path(testBaseDir1 + "/ua1" + testDate + "05/"));
@@ -687,7 +633,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
AssertUtil.checkForListSizes(ua1ReplicatedData05, ua3OriginalData05ua1);
AssertUtil.checkForListSizes(ua2ReplicatedData10, ua3OriginalData10ua2);
AssertUtil.checkForListSizes(ua2ReplicatedData15, ua3OriginalData15ua2);
-
}
@@ -702,7 +647,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
//data should be replicated to cluster2 from ua2 sub dir of cluster3 and cluster1
// source cluster path in cluster1 should be mentioned in cluster definition
// path for data in target cluster should also be customized
-
Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
String startTimeUA1 = "2012-10-01T12:00Z";
@@ -754,8 +698,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
//on ua1 only ua1 should be replicated, ua2 only ua2
//number of files should be same as source
-
-
List<Path> ua2ReplicatedData = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS,
new Path(testBaseDir2 + "/replicated" + testDate));
AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua2");
@@ -765,7 +707,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
List<Path> ua2ReplicatedData05ua3 = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS,
new Path(testBaseDir2 + "/replicated" + testDate + "05/ua3/"));
-
List<Path> ua1OriginalData00 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS, new Path(
testBaseDirServer1Source + testDate + "00/ua1"));
@@ -776,11 +717,8 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
AssertUtil.checkForListSizes(ua2ReplicatedData05ua3, ua3OriginalData05);
}
-
@Test(enabled = true)
public void normalTest1Source2TargetPartitionedSourceTarget() throws Exception {
-
-
//this test is for ideal condition when data is present in all the required places and
// replication takes
// place normally
@@ -843,8 +781,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
//on ua1 only ua1 should be replicated, ua2 only ua2
//number of files should be same as source
-
-
List<Path> ua1ReplicatedData = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate));
//check for no ua2 in ua1
@@ -854,7 +790,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate));
AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1");
-
List<Path> ua1ReplicatedData05 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS,
new Path(testBaseDir1 + "/ua1" + testDate + "05/"));
@@ -867,7 +802,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
List<Path> ua2ReplicatedData15 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate + "15"));
-
List<Path> ua3OriginalData05ua1 = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster3FS, new Path(
testDirWithDateSourceTarget + "05/ua3/ua1"));