You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/04/14 16:01:33 UTC
[3/5] falcon git commit: FALCON-1151 Migrate oozie related methods
from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index f299128..2f5dbd9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -101,7 +101,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -124,7 +124,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
bundles[0].setProcessConcurrency(10);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -148,10 +148,10 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
//create data for first 5 instances, 6th should be non-materialized
- String bundleId = InstanceUtil.getSequenceBundleID(clusterOC, processName,
+ String bundleId = OozieUtil.getSequenceBundleID(clusterOC, processName,
EntityType.PROCESS, 0);
for(CoordinatorJob c : clusterOC.getBundleJobInfo(bundleId).getCoordinators()) {
List<CoordinatorAction> actions = clusterOC.getCoordJobInfo(c.getId()).getActions();
@@ -192,7 +192,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity(startTime, endTime);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17);
String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23);
@@ -215,7 +215,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:21Z");
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
String startTime = TimeUtil.getTimeWrtSystemTime(1);
String endTime = TimeUtil.getTimeWrtSystemTime(40);
InstancesResult r = prism.getProcessHelper()
@@ -236,7 +236,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -258,7 +258,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 10);
@@ -280,7 +280,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -302,7 +302,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -322,7 +322,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -342,7 +342,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -362,7 +362,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
index c65461c..f65f9c9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -102,14 +102,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
start + "&end=2010-01-02T01:16Z");
InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
- List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
prism.getProcessHelper().getProcessInstanceRerun(processName,
start + "&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
@@ -127,7 +127,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -151,7 +151,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -175,7 +175,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -200,14 +200,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
String process = bundles[0].getProcessData();
LOGGER.info("process: " + Util.prettyPrintXml(process));
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
InstancesResult r = prism.getProcessHelper()
.getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z");
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
- List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
prism.getProcessHelper().
getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
@@ -225,14 +225,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
InstancesResult r = prism.getProcessHelper()
.getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z");
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
- List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
prism.getProcessHelper().getProcessInstanceRerun(processName,
start + "&end=2010-01-02T01:11Z");
TimeUtil.sleepSeconds(TIMEOUT);
@@ -250,13 +250,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
prism.getProcessHelper().getProcessInstanceKill(processName,
start + "&end=2010-01-02T01:01Z");
- String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.KILLED).get(0);
+ String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.KILLED).get(0);
prism.getProcessHelper().getProcessInstanceRerun(processName,
start + "&end=2010-01-02T01:01Z");
Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
@@ -275,11 +275,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
String process = bundles[0].getProcessData();
- InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.RUNNING,
+ String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.RUNNING,
Status.SUCCEEDED).get(0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction
.Status.SUCCEEDED, EntityType.PROCESS);
@@ -300,7 +300,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(2);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -308,7 +308,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
start + "&end=2010-01-02T01:06Z");
prism.getProcessHelper().getProcessInstanceRerun(processName,
start + "&end=2010-01-02T01:06Z");
- Assert.assertEquals(InstanceUtil.getInstanceStatus(cluster, processName, 0, 1),
+ Assert.assertEquals(InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 1),
CoordinatorAction.Status.SUSPENDED);
}
@@ -323,11 +323,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
prism.getProcessHelper().getProcessInstanceRerun(processName,
start + "&end=2010-01-02T01:11Z&force=true");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
@@ -352,7 +352,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
prism.getProcessHelper().getProcessInstanceRerun(processName,
start + "&end=2010-01-02T01:11Z");
- s = InstanceUtil.getInstanceStatus(cluster, processName, 0, 0);
+ s = InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 0);
Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
"instance should have been in WAITING state");
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
index 3893ffe..a2ff993 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -95,7 +95,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeOnlyEnd() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -118,7 +118,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeResumeSome() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -142,7 +142,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeResumeMany() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -166,7 +166,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
public void testProcessInstanceResumeSingle() throws Exception {
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 2);
@@ -259,7 +259,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeNonSuspended() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -280,7 +280,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeLastInstance() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
index 6003ee0..ee1c5e4 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -100,7 +100,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
public void getResumedProcessInstance() throws Exception {
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -123,7 +123,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
public void getSuspendedProcessInstance() throws Exception {
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -142,7 +142,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void getRunningProcessInstance() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -183,9 +183,9 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void getSucceededProcessInstance() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitForBundleToReachState(cluster, processName, Job.Status.SUCCEEDED);
+ OozieUtil.waitForBundleToReachState(clusterOC, processName, Job.Status.SUCCEEDED);
InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
InstanceUtil.validateSuccessWOInstances(r);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index 635e238..8f177ec 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -120,11 +120,11 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+ List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
@@ -146,7 +146,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T05:00Z");
AssertUtil.assertSucceeded(r);
@@ -164,7 +164,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
InstanceUtil.validateResponse(r, 5, 0, 0, 5, 0);
@@ -183,12 +183,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z");
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 5,
Status.RUNNING, EntityType.PROCESS);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+ List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
InstanceUtil.validateResponse(r, 6, 5, 0, 1, 0);
List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
@@ -206,12 +206,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusStartAndEnd() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1 ,
Status.RUNNING, EntityType.PROCESS);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+ List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
@@ -232,12 +232,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+ List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T00:00Z&end=2010-01-02T01:21Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
@@ -288,7 +288,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusReverseDateRange() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
Status.RUNNING, EntityType.PROCESS);
@@ -311,12 +311,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(2);
bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+ List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0);
@@ -337,7 +337,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
String process = bundles[0].getProcessData();
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
Status.RUNNING, EntityType.PROCESS, 5);
@@ -364,12 +364,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusOnlyStart() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+ List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z");
InstanceUtil.validateResponse(r, 5, 1, 0, 4, 0);
@@ -404,12 +404,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessConcurrency(5);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
+ List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0);
List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index b7fed18..f673314 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -86,7 +86,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -111,7 +111,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -131,7 +131,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -170,7 +170,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
@@ -249,7 +249,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index 40a4ad2..aef32bf 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -91,7 +91,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].submitAndScheduleProcess();
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 1);
@@ -107,7 +107,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
- OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+ OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
}
/**
@@ -131,7 +131,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].submitAndScheduleProcess();
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, true, 1);
@@ -147,7 +147,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
- OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+ OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
}
/**
@@ -175,7 +175,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 3);
@@ -191,7 +191,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
- OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
+ OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
}
/**
@@ -231,7 +231,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 7);
@@ -248,7 +248,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
- OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 0);
+ OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 0);
}
/*
@@ -271,10 +271,10 @@ public class ProcessLateRerunTest extends BaseTestClass {
Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
String bundleID = bundles.get(0);
LOGGER.info("bundle id: " + bundleID);
- List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID);
+ List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID);
for (int i = 0; i < 10 && missingDependencies == null; ++i) {
TimeUtil.sleepSeconds(30);
- missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID);
+ missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID);
}
Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
index 74903e1..3988ae9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
@@ -28,6 +28,7 @@ import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.*;
@@ -43,8 +44,8 @@ import java.util.Map;
@Test(groups = "embedded")
public class ProcessLibPathLoadTest extends BaseTestClass {
-
private ColoHelper cluster = servers.get(0);
+ private OozieClient clusterOC = serverOC.get(0);
private FileSystem clusterFS = serverFS.get(0);
private String testDir = cleanAndGetTestDir();
private String aggregateWorkflowDir = testDir + "/aggregator";
@@ -100,9 +101,9 @@ public class ProcessLibPathLoadTest extends BaseTestClass {
public void setRightJarInWorkflowLib() throws Exception {
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
+ OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
}
/**
@@ -112,13 +113,13 @@ public class ProcessLibPathLoadTest extends BaseTestClass {
* @throws Exception
*/
@Test
- public void setNoJarInWorkflowLibLocaltion() throws Exception {
+ public void setNoJarInWorkflowLibLocation() throws Exception {
HadoopUtil.deleteDirIfExists(aggregateWorkflowDir + "/lib/" + oozieLibName, clusterFS);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitForBundleToReachState(cluster, processName, Status.KILLED);
+ OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.KILLED);
}
/**
@@ -149,7 +150,6 @@ public class ProcessLibPathLoadTest extends BaseTestClass {
output.write(buffer, 0, n);
}
output.close();
-
}
private static boolean isRedirected(Map<String, List<String>> header) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index bc2978f..dc2fc37 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.OozieClient;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -47,6 +48,7 @@ import java.util.List;
public class ProcessLibPathTest extends BaseTestClass {
private ColoHelper cluster = servers.get(0);
+ private OozieClient clusterOC = serverOC.get(0);
private FileSystem clusterFS = serverFS.get(0);
private String testDir = cleanAndGetTestDir();
private String testLibDir = testDir + "/TestLib";
@@ -102,9 +104,9 @@ public class ProcessLibPathTest extends BaseTestClass {
bundles[0].setProcessWorkflow(workflowDir);
LOGGER.info("processData: " + Util.prettyPrintXml(process));
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
+ OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
}
/**
@@ -122,8 +124,8 @@ public class ProcessLibPathTest extends BaseTestClass {
bundles[0].setProcessWorkflow(workflowDir);
LOGGER.info("processData: " + Util.prettyPrintXml(process));
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
+ OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
index 3057166..f67cbf8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java
@@ -25,7 +25,6 @@ import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.*;
import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
@@ -44,12 +43,11 @@ import javax.xml.bind.JAXBException;
public class TouchAPIPrismAndServerTest extends BaseTestClass {
private ColoHelper cluster = servers.get(0);
private OozieClient clusterOC = serverOC.get(0);
- private FileSystem clusterFS = serverFS.get(0);
private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator";
- private String feed;
private static final Logger LOGGER = Logger.getLogger(TouchAPIPrismAndServerTest.class);
private String startTime;
private String endTime;
+ private String clusterName;
@BeforeClass(alwaysRun = true)
public void uploadWorkflow() throws Exception {
@@ -68,6 +66,7 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
bundles[0].setProcessValidity(startTime, endTime);
bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
+ clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
}
@AfterMethod(alwaysRun = true)
@@ -86,22 +85,23 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
public void touchProcessSchedule() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
- String oldbundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+ String coordId = OozieUtil.getLatestCoordinatorID(clusterOC,
+ bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(),
+ EntityType.PROCESS);
// via prism
ServiceResponse response = prism.getProcessHelper().touchEntity(bundles[0].getProcessData());
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
// via server
oldbundleId = bundleId;
- coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ coordId = OozieUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
response = cluster.getProcessHelper().touchEntity(bundles[0].getProcessData());
- bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
}
@@ -117,27 +117,22 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
public void touchFeedSchedule() throws Exception {
bundles[0].submitAndScheduleFeed();
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
- String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC,
- Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED);
- String oldbundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED);
+ String coordId = OozieUtil.getLatestCoordinatorID(clusterOC, clusterName, EntityType.FEED);
+ String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED);
// via prism
TimeUtil.sleepSeconds(60);
ServiceResponse response = prism.getFeedHelper().touchEntity(bundles[0].getDataSets().get(0));
- String bundleId = InstanceUtil.getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)),
- EntityType.FEED);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED);
Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
// via server
oldbundleId = bundleId;
- coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, Util.readEntityName(bundles[0].getDataSets().get(0)),
- EntityType.FEED);
+ coordId = OozieUtil.getLatestCoordinatorID(clusterOC, clusterName, EntityType.FEED);
TimeUtil.sleepSeconds(60);
response = cluster.getFeedHelper().touchEntity(bundles[0].getDataSets().get(0));
- bundleId = InstanceUtil.getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)),
- EntityType.FEED);
+ bundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED);
Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated.");
validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId);
@@ -157,21 +152,20 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass {
bundles[0].setProcessValidity(startTime, endTime);
bundles[0].submitFeedsScheduleProcess(prism);
AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
- String oldbundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+ String coordId = OozieUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
// via prism
ServiceResponse response = prism.getProcessHelper().touchEntity(bundles[0].getProcessData());
- String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(oldbundleId, bundleId, "New bundle generated");
validate(response, "Old bundle id: " + coordId);
//via server
oldbundleId = bundleId;
response = cluster.getProcessHelper().touchEntity(bundles[0].getProcessData());
- bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(oldbundleId, bundleId, "New bundle generated");
validate(response, "Old bundle id: " + coordId);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
index 9799a1c..67cbc52 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -32,8 +32,8 @@ import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HCatUtil;
import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
@@ -212,9 +212,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
.build()).toString();
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
- "REPLICATION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
//This test doesn't wait for replication to succeed.
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
index 53e4777..cd1b538 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.HCatUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -113,7 +114,6 @@ public class HCatReplicationTest extends BaseTestClass {
bundles[2].generateUniqueBundle(this);
bundles[2].setClusterInterface(Interfacetype.REGISTRY, cluster3.getClusterHelper()
.getHCatEndpoint());
-
}
@DataProvider
@@ -189,9 +189,7 @@ public class HCatReplicationTest extends BaseTestClass {
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
TimeUtil.sleepSeconds(TIMEOUT);
//check if all coordinators exist
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
- "REPLICATION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
//replication should start, wait while it ends
// we will check for 2 instances so that both partitions are copied over.
@@ -206,7 +204,6 @@ public class HCatReplicationTest extends BaseTestClass {
.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testHdfsDir));
LOGGER.info("Data on target cluster: " + cluster2ReplicatedData);
AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
}
// make sure oozie changes mentioned FALCON-389 are done on the clusters. Otherwise the test
@@ -285,14 +282,10 @@ public class HCatReplicationTest extends BaseTestClass {
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
TimeUtil.sleepSeconds(TIMEOUT);
//check if all coordinators exist
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
- "REPLICATION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
//check if all coordinators exist
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
- "REPLICATION"), 1);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, Util.readEntityName(feed), "REPLICATION"), 1);
//replication should start, wait while it ends
// we will check for 2 instances so that both partitions are copied over.
@@ -318,7 +311,6 @@ public class HCatReplicationTest extends BaseTestClass {
AssertUtil.checkForListSizes(srcData, cluster3TargetData);
}
-
private void addPartitionsToTable(List<String> partitions, List<String> partitionLocations,
String partitionCol,
String databaseName, String tableName, HCatClient hc) throws
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
index 7da8ef1..67b80d8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
@@ -66,7 +66,6 @@ import java.util.List;
public class EntitySummaryTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(EntitySummaryTest.class);
private ColoHelper cluster1 = servers.get(0);
- private ColoHelper cluster2 = servers.get(1);
private OozieClient cluster1OC = serverOC.get(0);
private OozieClient cluster2OC = serverOC.get(1);
private FileSystem cluster1FS = serverFS.get(0);
@@ -112,7 +111,7 @@ public class EntitySummaryTest extends BaseTestClass {
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
String clusterName = Util.readEntityName(bundles[0].getClusters().get(0));
- List<String> processes = scheduleEntityValidateWaitingInstances(cluster1,
+ List<String> processes = scheduleEntityValidateWaitingInstances(cluster1OC,
bundles[0].getProcessData(), EntityType.PROCESS, clusterName);
//create data for processes to run and wait some time for instances to make progress
@@ -161,8 +160,7 @@ public class EntitySummaryTest extends BaseTestClass {
AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster2Def));
//submit and schedule 7 feeds, check that 7 waiting instances are present for each feed
- List<String> feeds = scheduleEntityValidateWaitingInstances(cluster2, feed,
- EntityType.FEED, clusterName);
+ List<String> feeds = scheduleEntityValidateWaitingInstances(cluster2OC, feed, EntityType.FEED, clusterName);
//create data for processes to run and wait some time for instances to make progress
List<String> folders = TimeUtil.getMinuteDatesOnEitherSide(TimeUtil.oozieDateToDate(
@@ -180,9 +178,8 @@ public class EntitySummaryTest extends BaseTestClass {
* Schedules 7 entities and checks that summary reflects info about the most recent 7
* instances of each of them.
*/
- private List<String> scheduleEntityValidateWaitingInstances(ColoHelper cluster, String entity,
- EntityType entityType,
- String clusterName)
+ private List<String> scheduleEntityValidateWaitingInstances(OozieClient oozieClient, String entity,
+ EntityType entityType, String clusterName)
throws AuthenticationException, IOException, URISyntaxException, JAXBException,
OozieClientException, InterruptedException {
String entityName = Util.readEntityName(entity);
@@ -203,8 +200,8 @@ public class EntitySummaryTest extends BaseTestClass {
}
entity = entityMerlin.toString();
AssertUtil.assertSucceeded(helper.submitAndSchedule(entity));
- InstanceUtil.waitTillInstancesAreCreated(cluster, entity, 0);
- InstanceUtil.waitTillInstanceReachState(cluster.getClusterHelper().getOozieClient(),
+ InstanceUtil.waitTillInstancesAreCreated(oozieClient, entity, 0);
+ InstanceUtil.waitTillInstanceReachState(oozieClient,
uniqueName, 7, CoordinatorAction.Status.WAITING, entityType);
//check that summary shows recent (i) number of feeds and their instances
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
index 64a7e2e..6d41f9e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.response.lineage.VerticesResult;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.GraphAssert;
import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
@@ -37,6 +36,7 @@ import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -55,6 +55,7 @@ public class LineageApiProcessInstanceTest extends BaseTestClass {
private ColoHelper cluster = servers.get(0);
private FileSystem clusterFS = serverFS.get(0);
+ private OozieClient clusterOC = serverOC.get(0);
private LineageHelper lineageHelper;
private String baseTestHDFSDir = cleanAndGetTestDir();
private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
@@ -104,7 +105,7 @@ public class LineageApiProcessInstanceTest extends BaseTestClass {
outputFeedName = bundles[0].getOutputFeedNameFromBundle();
Job.Status status = null;
for (int i = 0; i < 20; i++) {
- status = InstanceUtil.getDefaultCoordinatorStatus(cluster, bundles[0].getProcessName(), 0);
+ status = OozieUtil.getDefaultCoordinatorStatus(clusterOC, bundles[0].getProcessName(), 0);
if (status == Job.Status.SUCCEEDED || status == Job.Status.KILLED) {
break;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
index 148d6ea..8ef6bb6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
@@ -123,7 +123,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
//submit and schedule feed
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
- InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed, 0);
InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 12,
CoordinatorAction.Status.WAITING, EntityType.FEED);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
index 66d1886..93e9a3e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
@@ -77,7 +77,7 @@ public class ListProcessInstancesTest extends BaseTestClass {
bundles[0].setProcessConcurrency(3);
bundles[0].submitAndScheduleProcess();
processName = bundles[0].getProcessName();
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
//create data for processes to run and wait some time for instances to make progress
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
index a72d339..99f58e6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java
@@ -130,18 +130,17 @@ public class FeedDelayTest extends BaseTestClass {
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
//check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), "REPLICATION"), 1);
+ InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed, 0);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1);
//Finding bundleId of replicated instance on target
- String bundleId = InstanceUtil.getLatestBundleID(cluster2, Util.readEntityName(feed), EntityType.FEED);
+ String bundleId = OozieUtil.getLatestBundleID(cluster2OC, Util.readEntityName(feed), EntityType.FEED);
//Finding startTime of replicated instance on target
- String startTimeO0zie = OozieUtil.getCoordStartTime(cluster2, feed, 0);
+ String startTimeO0zie = OozieUtil.getCoordStartTime(cluster2OC, feed, 0);
String startTimeExpected = getStartTime(sourceStartTime, targetStartTime, new Frequency(sourceDelay), flag);
- List<String> missingDep = getAndCreateDependencies(cluster1, cluster1FS, cluster2, bundleId);
+ List<String> missingDep = getAndCreateDependencies(cluster1FS, cluster1.getPrefix(), cluster2OC, bundleId);
List<String> qaDep = new ArrayList<String>();
if (flag) {
@@ -176,25 +175,24 @@ public class FeedDelayTest extends BaseTestClass {
};
}
- private List<String> getAndCreateDependencies(ColoHelper prismHelper1, FileSystem clusterFS1,
- ColoHelper prismHelper2, String bundleId) throws OozieClientException, IOException {
- List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
+ private List<String> getAndCreateDependencies(FileSystem sourceFS, String sourcePrefix, OozieClient targetOC,
+ String bundleId) throws OozieClientException, IOException {
+ List<String> missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
for (int i = 0; i < 10 && missingDependencies == null; ++i) {
TimeUtil.sleepSeconds(30);
LOGGER.info("sleeping...");
- missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
+ missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
}
Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
// Creating missing dependencies
- HadoopUtil.createHDFSFolders(prismHelper1, missingDependencies);
+ HadoopUtil.createFolders(sourceFS, sourcePrefix, missingDependencies);
//Adding data to empty folders
for (String location : missingDependencies) {
LOGGER.info("Transferring data to : " + location);
- HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
+ HadoopUtil.copyDataToFolder(sourceFS, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
}
-
return missingDependencies;
}