You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/04/09 15:24:29 UTC
[2/3] falcon git commit: FALCON-1135 Migrate methods related to
*Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by
Ruslan Ostafiychuk
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
index 13a9776..76d033c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
@@ -93,7 +93,8 @@ public class NewRetryTest extends BaseTestClass {
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
startDate = new DateTime(DateTimeZone.UTC).plusMinutes(1);
endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2);
- bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessValidity(TimeUtil.dateToOozieDate(startDate.toDate()),
+ TimeUtil.dateToOozieDate(endDate.toDate()));
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
feed.setFeedPathValue(latePath).insertLateFeedValue(new Frequency("minutes(8)"));
@@ -135,7 +136,7 @@ public class NewRetryTest extends BaseTestClass {
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -150,7 +151,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper()
.update((bundles[0].getProcessData()), bundles[0].getProcessData());
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -183,7 +184,7 @@ public class NewRetryTest extends BaseTestClass {
AssertUtil.assertSucceeded(
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int attempt = 0;
attempt < 20 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
@@ -205,7 +206,7 @@ public class NewRetryTest extends BaseTestClass {
.getMessage().contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -242,7 +243,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int i = 0; i < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++i) {
TimeUtil.sleepSeconds(10);
@@ -260,7 +261,7 @@ public class NewRetryTest extends BaseTestClass {
.getMessage().contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -295,7 +296,7 @@ public class NewRetryTest extends BaseTestClass {
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int attempt = 0;
attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 2); ++attempt) {
@@ -316,7 +317,7 @@ public class NewRetryTest extends BaseTestClass {
.getMessage().contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -350,7 +351,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -360,11 +361,11 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertTrue(prism.getProcessHelper()
- .update(Util.readEntityName(bundles[0].getProcessData()),
+ .update(bundles[0].getProcessName(),
null).getMessage()
.contains("updated successfully"), "process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -398,7 +399,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -408,11 +409,11 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertTrue(
- prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData()),
+ prism.getProcessHelper().update(bundles[0].getProcessName(),
bundles[0].getProcessData()).getMessage()
.contains("updated successfully"), "process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -449,7 +450,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -460,12 +461,12 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertTrue(prism.getProcessHelper()
- .update(Util.readEntityName(bundles[0].getProcessData()),
+ .update(bundles[0].getProcessName(),
bundles[0].getProcessData()).getMessage()
.contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(bundles[0].getProcessData()),
+ .getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -503,7 +504,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -513,11 +514,11 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertFalse(
- prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData())
+ prism.getProcessHelper().update(bundles[0].getProcessName()
, bundles[0].getProcessData()).getMessage().contains("updated successfully"),
"process was updated successfully!!!");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -555,7 +556,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
//now to validate all failed instances to check if they were retried or not.
validateRetry(clusterOC, bundleId,
@@ -595,7 +596,7 @@ public class NewRetryTest extends BaseTestClass {
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int attempt = 0;
attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
@@ -608,7 +609,7 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("now firing user reruns:");
for (int i = 0; i < 1; i++) {
prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ .getProcessInstanceRerun(bundles[0].getProcessName(),
"?start=" + timeFormatter.print(startDate).replace("/", "T") + "Z"
+ "&end=" + timeFormatter.print(endDate).replace("/", "T") + "Z");
}
@@ -648,7 +649,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()),
+ bundles[0].getProcessName(),
EntityType.PROCESS).get(0);
//now to validate all failed instances to check if they were retried or not.
@@ -659,7 +660,7 @@ public class NewRetryTest extends BaseTestClass {
DateTime[] dateBoundaries = getFailureTimeBoundaries(clusterOC, bundleId);
InstancesResult piResult = prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ .getProcessInstanceRerun(bundles[0].getProcessName(),
"?start=" + timeFormatter.print(dateBoundaries[0]).replace("/", "T") + "Z"
+ "&end=" + timeFormatter.print(dateBoundaries[dateBoundaries.length - 1])
.replace("/", "T") + "Z");
@@ -702,7 +703,7 @@ public class NewRetryTest extends BaseTestClass {
AssertUtil.assertSucceeded(
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
List<DateTime> dates = null;
for (int i = 0; i < 10 && dates == null; ++i) {
@@ -802,7 +803,7 @@ public class NewRetryTest extends BaseTestClass {
AssertUtil.assertSucceeded(
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
List<DateTime> dates = null;
for (int i = 0; i < 10 && dates == null; ++i) {
@@ -878,7 +879,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
validateRetry(clusterOC, bundleId,
(bundles[0].getProcessObject().getRetry().getAttempts()) / 2);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
index 59a701d..61c076b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
@@ -83,10 +83,9 @@ public class NoOutputProcessTest extends BaseTestClass {
bundles[0].setInputFeedDataPath(inputPath);
bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
process.setOutputs(null);
process.setLateProcess(null);
- bundles[0].setProcessData(process.toString());
bundles[0].submitFeedsScheduleProcess(prism);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
index 8cf2862..d7331cf 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
@@ -30,7 +30,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileSystem;
@@ -103,7 +102,7 @@ public class ProcessFrequencyTest extends BaseTestClass {
TimeUtil.oozieDateToDate(startDate));
HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT);
- final String processName = Util.readEntityName(bundles[0].getProcessData());
+ final String processName = bundles[0].getProcessName();
//InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
index 48cb59b..56e1474 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
@@ -119,11 +119,11 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.getTimeWrtSystemTime(-35), TimeUtil.getTimeWrtSystemTime(25), 1);
- String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
+ String prefix = feed01.getFeedPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.SINGLE_FILE, prefix, dataDates);
- prefix = InstanceUtil.getFeedPrefix(feed02.toString());
+ prefix = feed02.getFeedPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, prefix, dataDates);
@@ -179,7 +179,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
String processStartTime = TimeUtil.getTimeWrtSystemTime(-16);
// String processEndTime = InstanceUtil.getTimeWrtSystemTime(20);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
process.clearProcessCluster();
process.addProcessCluster(
new ProcessMerlin.ProcessClusterBuilder(
@@ -192,8 +192,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
.withValidity(TimeUtil.addMinsToTime(processStartTime, 16),
TimeUtil.addMinsToTime(processStartTime, 45))
.build());
- process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(), feed02.getName(),
- feed02.getName()));
+ process.addInputFeed(feed02.getName(), feed02.getName());
//submit and schedule process
prism.getProcessHelper().submitAndSchedule(process.toString());
@@ -204,42 +203,40 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(serverOC.get(1), process.getName(), 1,
Status.RUNNING, EntityType.PROCESS);
- final String processName = Util.readEntityName(bundles[0].getProcessData());
- InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(
- processName, "?start=" + processStartTime
- + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
+ InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
+ "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(process.getName(),
"?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
+ "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
"?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
+ "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceResume(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceResume(process.getName(),
"?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
"?start=" + TimeUtil.addMinsToTime(processStartTime, 16)
+ "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(processName,
+ responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(process.getName(),
"?start=" + processStartTime + "&end="+ TimeUtil.addMinsToTime(processStartTime, 7));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceRerun(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceRerun(process.getName(),
"?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index 34dfce3..f299128 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -29,7 +29,6 @@ import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
@@ -83,7 +82,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
index f558cc5..3893ffe 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileSystem;
@@ -79,7 +78,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setProcessConcurrency(6);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
index c9334eb..6003ee0 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -83,7 +82,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index 58936a7..635e238 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -27,7 +27,6 @@ import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.AssertUtil;
@@ -97,7 +96,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index 26348bd..b7fed18 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -18,7 +18,6 @@
package org.apache.falcon.regression;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -29,7 +28,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
@@ -68,7 +66,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
@@ -115,8 +113,8 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, new ProcessMerlin(bundles[0]
- .getProcessData()).getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z");
AssertUtil.assertSucceeded(r);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index b41cf05..40a4ad2 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -85,8 +85,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
bundles[0].setProcessConcurrency(2);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+ String inputName = bundles[0].getProcessObject().getFirstInputName();
bundles[0].setProcessLatePolicy(getLateData(2, "minutes", "periodic", inputName, aggregateWorkflowDir));
bundles[0].submitAndScheduleProcess();
@@ -126,8 +125,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
bundles[0].setProcessConcurrency(2);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+ String inputName = bundles[0].getProcessObject().getFirstInputName();
bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
bundles[0].submitAndScheduleProcess();
@@ -166,8 +164,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].setProcessValidity(startTime, endTime);
bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+ String inputName = bundles[0].getProcessObject().getFirstInputName();
bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
bundles[0].setProcessConcurrency(2);
@@ -216,17 +213,17 @@ public class ProcessLateRerunTest extends BaseTestClass {
// Increase the window of input for process
bundles[0].setDatasetInstances(startInstance, endInstance);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
- Input tempFeed = processMerlin.getInputs().getInputs().get(0);
+ ProcessMerlin process = bundles[0].getProcessObject();
+ String inputName = process.getFirstInputName();
+ Input tempFeed = process.getInputs().getInputs().get(0);
Input gateInput = new Input();
gateInput.setName("Gate");
gateInput.setFeed(tempFeed.getFeed());
gateInput.setEnd("now(0,1)");
gateInput.setStart("now(0,1)");
- processMerlin.getInputs().getInputs().add(gateInput);
- bundles[0].setProcessData(processMerlin.toString());
+ process.getInputs().getInputs().add(gateInput);
+ bundles[0].setProcessData(process.toString());
bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
index cd7eba4..c73140d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
@@ -80,7 +80,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleValidProcessSLA() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
new Frequency("6", Frequency.TimeUnit.hours));
bundles[0].setProcessData(processMerlin.toString());
@@ -95,7 +95,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
new Frequency("3", Frequency.TimeUnit.hours));
bundles[0].setProcessData(processMerlin.toString());
@@ -110,7 +110,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours),
new Frequency("2", Frequency.TimeUnit.hours));
bundles[0].setProcessData(processMerlin.toString());
@@ -131,7 +131,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours));
processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
new Frequency("4", Frequency.TimeUnit.hours));
@@ -147,7 +147,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours));
processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
new Frequency("4", Frequency.TimeUnit.hours));
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
index 9886d76..ca612b8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
@@ -236,7 +236,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass {
prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
prism.getFeedHelper().submitEntity(feed);
prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle());
- ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processObj = bundles[0].getProcessObject();
processObj.setWorkflow(null);
ServiceResponse response = prism.getProcessHelper().validateEntity(processObj.toString());
AssertUtil.assertFailed(response);
@@ -253,7 +253,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass {
prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
prism.getFeedHelper().submitEntity(feed);
prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle());
- ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processObj = bundles[0].getProcessObject();
processObj.setWorkflow(null);
ServiceResponse response = cluster.getProcessHelper().validateEntity(processObj.toString());
AssertUtil.assertFailed(response);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
index f9fcf8d..f58b0f6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
@@ -67,20 +67,20 @@ public class EntitiesPatternSearchTest extends BaseTestClass {
//submit different clusters, feeds and processes
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
ClusterMerlin cluster = bundles[0].getClusterElement();
String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-';
String processNamePrefix = bundles[0].getProcessName() + '-';
String feedNamePrefix = bundles[0].getInputFeedNameFromBundle() + '-';
- List randomeNames = getPatternName();
- for (int i = 0; i < randomeNames.size(); i++) {
- process.setName(processNamePrefix + randomeNames.get(i));
+ List randomNames = getPatternName();
+ for (Object randomName : randomNames) {
+ process.setName(processNamePrefix + randomName);
AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString()));
- feed.setName(feedNamePrefix + randomeNames.get(i));
+ feed.setName(feedNamePrefix + randomName);
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
- cluster.setName(clusterNamePrefix + randomeNames.get(i));
+ cluster.setName(clusterNamePrefix + randomName);
AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster.toString()));
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
index 13b3b88..3ae44e6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
@@ -82,7 +82,7 @@ public class ListEntitiesTest extends BaseTestClass {
//submit 10 different clusters, feeds and processes
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
ClusterMerlin cluster = bundles[0].getClusterElement();
String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-';
String processNamePrefix = bundles[0].getProcessName() + '-';
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
index 202298e..7d05d6b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
@@ -253,8 +253,7 @@ public class HCatProcessTest extends BaseTestClass {
feedObj.setName(inputFeed2Name);
feedObj.getTable().setUri(inputTableUri2);
- String inputFeed2 = feedObj.toString();
- bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+ bundles[0].addInputFeedToBundle("inputData2", feedObj);
String outputTableUri =
"catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
@@ -347,7 +346,7 @@ public class HCatProcessTest extends BaseTestClass {
FeedMerlin feedObj = new FeedMerlin(outputFeed1);
feedObj.setName(outputFeed2Name);
feedObj.getTable().setUri(outputTableUri2);
- bundles[0].addOutputFeedToBundle("outputData2", feedObj.toString(), 0);
+ bundles[0].addOutputFeedToBundle("outputData2", feedObj);
bundles[0].setProcessValidity(startDate, endDate);
bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
@@ -433,8 +432,7 @@ public class HCatProcessTest extends BaseTestClass {
FeedMerlin feedObj = new FeedMerlin(inputFeed1);
feedObj.setName(inputFeed2Name);
feedObj.getTable().setUri(inputTableUri2);
- String inputFeed2 = feedObj.toString();
- bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+ bundles[0].addInputFeedToBundle("inputData2", feedObj);
String outputTableUri =
"catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
@@ -448,8 +446,7 @@ public class HCatProcessTest extends BaseTestClass {
FeedMerlin feedObj2 = new FeedMerlin(outputFeed1);
feedObj2.setName(outputFeed2Name);
feedObj2.getTable().setUri(outputTableUri2);
- String outputFeed2 = feedObj2.toString();
- bundles[0].addOutputFeedToBundle("outputData2", outputFeed2, 0);
+ bundles[0].addOutputFeedToBundle("outputData2", feedObj2);
bundles[0].setProcessValidity(startDate, endDate);
bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index 4466c13..3f6dc66 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -130,8 +130,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ .getLatestBundleID(cluster3, bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -139,16 +138,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 5, TimeUnit.minutes));
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes));
- LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess));
+ LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess.toString()));
//now to update
while (Util
.parseResponse(prism.getProcessHelper()
- .update((bundles[1].getProcessData()), updatedProcess))
+ .update((bundles[1].getProcessData()), updatedProcess.toString()))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("update didn't SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
@@ -186,7 +184,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes =
OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
@@ -256,7 +254,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(25);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -271,14 +269,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to update
AssertUtil.assertPartial(prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
- String prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), workflowPath);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder());
+ ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency);
+ Assert.assertEquals(process.getWorkflow().getPath(), workflowPath);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
String coloString = getResponse(cluster2, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(),
- workflowPath2);
+ Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(), workflowPath2);
Util.startService(cluster3.getProcessHelper());
dualComparisonFailure(prism, cluster2, bundles[1].getProcessData());
@@ -296,13 +293,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("WARNING: update did not succeed, retrying ");
TimeUtil.sleepSeconds(20);
}
- prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
- initialConcurrency + 3);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
- workflowPath2);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
- bundles[1].getProcessObject().getOrder());
+ process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+ Assert.assertEquals(process.getWorkflow().getPath(), workflowPath2);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
AssertUtil
.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -336,7 +330,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -344,16 +338,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 7, TimeUnit.minutes));
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("7", TimeUnit.minutes));
LOGGER.info("updated process: " + updatedProcess);
//now to update
-
ServiceResponse response =
- prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
AssertUtil.assertSucceeded(response);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
@@ -361,7 +353,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(),
- new ProcessMerlin(updatedProcess).getFrequency());
+ updatedProcess.getFrequency());
dualComparison(prism, cluster3, bundles[1].getProcessData());
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -376,7 +368,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String originalProcessData = bundles[1].getProcessData();
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -410,7 +402,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -443,7 +435,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
// future : should be verified using cord xml
Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -452,7 +444,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
status != Job.Status.DONEWITHERROR) {
int statusCount = InstanceUtil
.getInstanceCountWithStatus(cluster3,
- Util.readEntityName(bundles[1].getProcessData()),
+ bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS);
if (statusCount == bundles[1].getProcessObject().getParallel() + 3) {
@@ -460,7 +452,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
Assert.assertNotNull(status,
"status must not be null!");
TimeUtil.sleepSeconds(30);
@@ -494,7 +486,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -577,7 +569,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
@@ -610,7 +602,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -619,7 +611,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
.getInstanceCountWithStatus(cluster3,
- Util.readEntityName(bundles[1].getProcessData()),
+ bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
==
@@ -628,7 +620,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
@@ -670,7 +662,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -707,7 +699,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
Job.Status status =
OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -716,7 +708,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
.getInstanceCountWithStatus(cluster3,
- Util.readEntityName(bundles[1].getProcessData()),
+ bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
==
@@ -725,13 +717,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
+ bundles[1].getProcessName(), EntityType.PROCESS),
oldNominalTimes, bundles[1].getProcessData(), false,
true
);
@@ -771,7 +763,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -793,13 +785,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) {
TimeUtil.sleepSeconds(10);
}
- String prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
- initialConcurrency + 3);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
- aggregator1Path);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
- bundles[1].getProcessObject().getOrder());
+ ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+ Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
@@ -839,7 +828,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -867,13 +856,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
- String prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
- initialConcurrency + 3);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
- aggregator1Path);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
- bundles[1].getProcessObject().getOrder());
+ ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+ Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
@@ -913,7 +899,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(20);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -965,7 +951,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
@@ -1090,7 +1076,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -1153,7 +1139,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -1220,7 +1206,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes =
@@ -1228,15 +1214,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 5, TimeUnit.minutes));
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes));
LOGGER.info("updated process: " + updatedProcess);
//now to update
ServiceResponse response =
- prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
AssertUtil.assertSucceeded(response);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
@@ -1272,7 +1257,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
@@ -1280,18 +1265,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 1, TimeUnit.months));
- updatedProcess = InstanceUtil
- .setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10),
- endTime);
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("1", TimeUnit.months));
+ updatedProcess.setValidity(TimeUtil.getTimeWrtSystemTime(10), endTime);
LOGGER.info("updated process: " + updatedProcess);
//now to update
ServiceResponse response =
- prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
AssertUtil.assertSucceeded(response);
String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData());
Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(),
@@ -1316,7 +1298,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -1351,7 +1333,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
@@ -1393,7 +1375,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getValidity().getEnd()));
Assert.assertEquals(InstanceUtil
.getProcessInstanceList(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS)
+ bundles[1].getProcessName(), EntityType.PROCESS)
.size(), getExpectedNumberOfWorkflowInstances(newStartTime,
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
@@ -1409,7 +1391,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1476,8 +1458,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//update workflow.xml
hadoopFileEditor = new HadoopFileEditor(cluster1FS);
- hadoopFileEditor.edit(new ProcessMerlin(b
- .getProcessData()).getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
+ hadoopFileEditor.edit(b.getProcessObject().getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
"<!-- some comment -->");
//update
@@ -1553,7 +1534,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
throws Exception {
while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) {
+ bundle.getProcessName(), EntityType.PROCESS) != state) {
//keep waiting
TimeUtil.sleepSeconds(10);
}
@@ -1566,7 +1547,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
while (coord.getStatus() != state) {
TimeUtil.sleepSeconds(10);
coord = getDefaultOozieCoord(coloHelper, InstanceUtil
- .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
+ .getLatestBundleID(coloHelper, bundle.getProcessName(),
EntityType.PROCESS));
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
index c9e373e..39f0268 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
@@ -93,7 +93,7 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
LOGGER.info(Util.prettyPrintXml(process.toString()));
bundles[0].submitAndScheduleBundle(prism, false);
@@ -125,7 +125,7 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- String processName = Util.readEntityName(bundles[0].getProcessData());
+ String processName = bundles[0].getProcessName();
LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
@@ -163,7 +163,7 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- String processName = Util.readEntityName(bundles[0].getProcessData());
+ String processName = bundles[0].getProcessName();
LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
@@ -232,11 +232,11 @@ public class OptionalInputTest extends BaseTestClass {
LOGGER.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
}
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
- LOGGER.info(Util.prettyPrintXml(process.toString()));
+
+ LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
- InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, bundles[0].getProcessName(),
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
@@ -262,10 +262,8 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
- LOGGER.info(Util.prettyPrintXml(process.toString()));
- String processName = process.getName();
- LOGGER.info(Util.prettyPrintXml(process.toString()));
+ LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ String processName = bundles[0].getProcessName();
bundles[0].submitAndScheduleBundle(prism, true);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
@@ -278,11 +276,9 @@ public class OptionalInputTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- final ProcessMerlin processMerlin = new ProcessMerlin(process);
- processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
- bundles[0].setProcessData(processMerlin.toString());
- bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
- process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
+ process.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
+ process.setProcessInputStartEnd("now(0,-10)", "now(0,0)");
LOGGER.info("modified process:" + Util.prettyPrintXml(process.toString()));
prism.getProcessHelper().update(process.toString(), process.toString());
@@ -319,25 +315,22 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(4);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
- String processName = process.getName();
+ ProcessMerlin process = bundles[0].getProcessObject();
LOGGER.info(Util.prettyPrintXml(process.toString()));
bundles[0].submitAndScheduleBundle(prism, true);
- InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
+ InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), TimeUtil.addMinsToTime(endTime, 10), 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
- inputPath + "/input1/", dataDates);
- InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
- 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ inputPath + "/input1/", dataDates);
+ InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
+ 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- final ProcessMerlin processMerlin = new ProcessMerlin(process);
- processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
- bundles[0].setProcessData(processMerlin.toString());
- process = new ProcessMerlin(bundles[0].getProcessData());
+ process.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
+ bundles[0].setProcessData(process.toString());
//delete all input data
HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS);
@@ -347,7 +340,7 @@ public class OptionalInputTest extends BaseTestClass {
prism.getProcessHelper().update(process.toString(), process.toString());
//from now on ... it should wait of input0 also
- InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
+ InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
}