You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/04/09 15:24:28 UTC
[1/3] falcon git commit: FALCON-1135 Migrate methods related to
*Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by
Ruslan Ostafiychuk
Repository: falcon
Updated Branches:
refs/heads/master 8c7eaa69f -> 395675fb0
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
index 97d4e67..c6f72cc 100755
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
@@ -201,38 +201,38 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA2 = "2012-10-01T12:10Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2012-10-01T12:10Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("")
.withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2012-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withPartition("")
.withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("")
- .build()).toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source "
+ "is blank");
@@ -255,42 +255,39 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA2 = "2012-10-01T12:00Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(100000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2099-10-01T12:10Z")
- .build())
- .toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(100000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2099-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
- .build())
- .toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(100000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
- .build())
- .toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertSucceeded(r);
- r = prism.getFeedHelper().schedule(feed);
+ r = prism.getFeedHelper().schedule(feed.toString());
AssertUtil.assertSucceeded(r);
TimeUtil.sleepSeconds(15);
@@ -302,19 +299,19 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/",
testFile2);
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
"REPLICATION"), 1);
Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
"RETENTION"), 1);
Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+ InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
"RETENTION"), 1);
Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+ InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
"RETENTION"), 1);
@@ -356,53 +353,52 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA2 = "2012-10-01T12:00Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2099-10-01T12:10Z")
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2099-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
- .build())
- .toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
.withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed);
+ ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertSucceeded(r);
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
"REPLICATION"), 1);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
"RETENTION"), 1);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+ .checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
"RETENTION"), 1);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+ .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
"RETENTION"), 1);
@@ -449,48 +445,48 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA2 = "2012-10-01T12:10Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = InstanceUtil.setFeedFilePath(feed, testBaseDir3 + MINUTE_DATE_PATTERN);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setFilePath(testBaseDir3 + MINUTE_DATE_PATTERN);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2012-10-01T12:10Z")
.withClusterType(ClusterType.TARGET)
.withPartition("${cluster.colo}")
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2012-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withPartition("${cluster.colo}")
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertSucceeded(r);
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
TimeUtil.sleepSeconds(15);
- InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 3,
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 3,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
//check if data has been replicated correctly
@@ -550,38 +546,37 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA2 = "2012-10-01T12:10Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2012-10-01T12:10Z")
.withClusterType(ClusterType.SOURCE)
.withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2012-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
- .build())
- .toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ .build());
//clean target if old data exists
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
AssertUtil.assertFailed(r, "Submission of feed should have failed.");
Assert.assertTrue(r.getMessage().contains(
"Partition expression has to be specified for cluster "
@@ -609,49 +604,46 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA2 = "2012-10-01T12:10Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = InstanceUtil.setFeedFilePath(feed,
- testBaseDir1 + MINUTE_DATE_PATTERN);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setFilePath(testBaseDir1 + MINUTE_DATE_PATTERN);
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(10000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2012-10-01T12:11Z")
.withClusterType(ClusterType.TARGET)
.withDataLocation(testBaseDir1 + "/ua1" + MINUTE_DATE_PATTERN)
- .build())
- .toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(10000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2012-10-01T12:26Z")
.withClusterType(ClusterType.TARGET)
.withDataLocation(testBaseDir1 + "/ua2" + MINUTE_DATE_PATTERN)
- .build())
- .toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(10000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("${cluster.colo}")
- .build()).toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertSucceeded(r);
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
TimeUtil.sleepSeconds(15);
- InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
//check if data has been replicated correctly
@@ -716,47 +708,46 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA1 = "2012-10-01T12:00Z";
String startTimeUA2 = "2012-10-01T12:00Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2099-10-01T12:10Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDirServer1Source + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2099-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withDataLocation(testBaseDir2 + "/replicated" + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
- .build())
- .toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertSucceeded(r);
- r = prism.getFeedHelper().schedule(feed);
+ r = prism.getFeedHelper().schedule(feed.toString());
AssertUtil.assertSucceeded(r);
TimeUtil.sleepSeconds(15);
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
//check if data has been replicated correctly
@@ -804,49 +795,48 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA1 = "2012-10-01T12:05Z";
String startTimeUA2 = "2012-10-01T12:10Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = InstanceUtil.setFeedFilePath(feed, testBaseDir1 + MINUTE_DATE_PATTERN);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setFilePath(testBaseDir1 + MINUTE_DATE_PATTERN);
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2099-10-01T12:10Z")
.withClusterType(ClusterType.TARGET)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDir1 + "/ua1" + MINUTE_DATE_PATTERN + "/")
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2099-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDir1 + "/ua2" + MINUTE_DATE_PATTERN + "/")
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("${cluster.colo}")
.withDataLocation(testBaseDir4 + MINUTE_DATE_PATTERN + "/")
- .build())
- .toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertSucceeded(r);
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
TimeUtil.sleepSeconds(15);
- InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
//check if data has been replicated correctly
@@ -905,38 +895,38 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
String startTimeUA1 = "2012-10-01T12:05Z";
String startTimeUA2 = "2012-10-01T12:10Z";
- String feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.clearFeedClusters();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA1, "2012-10-01T12:10Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("")
.withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTimeUA2, "2012-10-01T12:25Z")
.withClusterType(ClusterType.TARGET)
.withPartition("")
.withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
.withPartition("")
- .build()).toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+ ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
TimeUtil.sleepSeconds(10);
AssertUtil.assertFailed(r, "is defined more than once for feed");
Assert.assertTrue(r.getMessage().contains("is defined more than once for feed"));
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
index e1a96f3..6f60bb8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
@@ -101,8 +101,8 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
bundles[0].setInputFeedDataPath(inputPath);
Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
- String feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.clearFeedClusters();
// use the colo string here so that the test works in embedded and distributed mode.
String postFix = "/US/" + cluster2Colo;
@@ -118,62 +118,62 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
String startTime = TimeUtil.getTimeWrtSystemTime(-30);
- feed = FeedMerlin.fromString(feed)
+ feed
.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("hours(10)", ActionType.DELETE)
.withValidity(startTime, TimeUtil.addMinsToTime(startTime, 85))
.withClusterType(ClusterType.SOURCE)
.withPartition("US/${cluster.colo}")
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("hours(10)", ActionType.DELETE)
.withValidity(TimeUtil.addMinsToTime(startTime, 20),
TimeUtil.addMinsToTime(startTime, 105))
.withClusterType(ClusterType.TARGET)
- .build()).toString();
+ .build());
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
.withRetention("hours(10)", ActionType.DELETE)
.withValidity(TimeUtil.addMinsToTime(startTime, 40),
TimeUtil.addMinsToTime(startTime, 130))
.withClusterType(ClusterType.SOURCE)
.withPartition("UK/${cluster.colo}")
- .build()).toString();
+ .build());
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed));
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString()));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
//change feed location path
- feed = InstanceUtil.setFeedFilePath(feed, alternativeInputPath);
+ feed.setFilePath(alternativeInputPath);
- LOGGER.info("updated feed: " + Util.prettyPrintXml(feed));
+ LOGGER.info("updated feed: " + Util.prettyPrintXml(feed.toString()));
//update feed
- AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed, feed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(),
- Util.readEntityName(feed),
+ Util.readEntityName(feed.toString()),
"REPLICATION"), 0);
Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(),
- Util.readEntityName(feed),
+ Util.readEntityName(feed.toString()),
"RETENTION"), 2);
Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(),
- Util.readEntityName(feed),
+ Util.readEntityName(feed.toString()),
"REPLICATION"), 0);
Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(),
- Util.readEntityName(feed),
+ Util.readEntityName(feed.toString()),
"RETENTION"), 2);
Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+ InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()),
"REPLICATION"), 4);
Assert.assertEquals(
- InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+ InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()),
"RETENTION"), 2);
}
@@ -209,11 +209,11 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
feed02.setFeedPathValue(baseTestDir + "/feed02" + MINUTE_DATE_PATTERN);
//generate data in both the colos ua1 and ua3
- String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
+ String prefix = feed01.getFeedPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
HadoopUtil.lateDataReplenish(cluster1FS, 25, 1, prefix, null);
- prefix = InstanceUtil.getFeedPrefix(feed02.toString());
+ prefix = feed02.getFeedPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
HadoopUtil.lateDataReplenish(cluster3FS, 25, 1, prefix, null);
@@ -290,9 +290,9 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
new ProcessMerlin.ProcessClusterBuilder(
Util.readEntityName(bundles[2].getClusters().get(0)))
.withValidity(processStartTime, processEndTime)
- .build());
- process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(),
- feed02.toString(), feed02.getName()));
+ .build()
+ );
+ process.addInputFeed(feed02.getName(), feed02.getName());
//submit and schedule process
AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString()));
@@ -305,7 +305,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(serverOC.get(2), process.getName(), 1,
Status.RUNNING, EntityType.PROCESS, timeout);
- feed01 = new FeedMerlin(InstanceUtil.setFeedFilePath(feed01.toString(), alternativeInputPath));
+ feed01.setFilePath(alternativeInputPath);
LOGGER.info("updated feed: " + Util.prettyPrintXml(feed01.toString()));
AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed01.toString(), feed01.toString()));
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
index 483c281..d855e33 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
@@ -125,7 +125,7 @@ public class PrismFeedUpdateTest extends BaseTestClass {
feed01.setFeedPathValue(baseTestDir + "/feed01" + MINUTE_DATE_PATTERN);
//generate data in both the colos cluster1colo and cluster2colo
- String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
+ String prefix = feed01.getFeedPrefix();
String startTime = TimeUtil.getTimeWrtSystemTime(-40);
System.out.println("Start time = " + startTime);
HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS);
@@ -183,9 +183,9 @@ public class PrismFeedUpdateTest extends BaseTestClass {
.build());
//get 2nd process
- ProcessMerlin process02 = new ProcessMerlin(InstanceUtil
- .setProcessName(process01.toString(), this.getClass().getSimpleName()
- + "-zeroInputProcess" + new Random().nextInt()));
+ ProcessMerlin process02 = new ProcessMerlin(process01);
+ process02.setName(this.getClass().getSimpleName() + "-zeroInputProcess"
+ + new Random().nextInt());
List<String> feed = new ArrayList<String>();
feed.add(outputFeed.toString());
process02.setProcessFeeds(feed, 0, 0, 1);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
index f1ff8fe..e2f01c5 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
@@ -106,14 +106,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundle.getProcessData());
+ String processName = bundle.getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA1:
- compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
- compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
//UA2:
compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -157,7 +157,7 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundle.getProcessData());
+ String processName = bundle.getProcessName();
//prism:
compareDataStoresForEquality(initialPrismStore, finalPrismStore);
compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
@@ -177,16 +177,16 @@ public class PrismProcessDeleteTest extends BaseTestClass {
HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
- compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+ compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName);
compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
- clusterName);
+ processName);
compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
- compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+ compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName);
compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
- clusterName);
+ processName);
} catch (Exception e) {
LOGGER.info(e.getMessage());
throw new TestNGException(e.getMessage());
@@ -290,18 +290,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundle.getProcessData());
+ String processName = bundle.getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA2:
compareDataStoresForEquality(initialUA2Store, finalUA2Store);
compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
//UA1:
- compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
- compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
} catch (Exception e) {
LOGGER.info(e.getMessage());
@@ -455,14 +455,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundle.getProcessData());
+ String processName = bundle.getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA1:
- compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
- compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
//UA2:
compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -506,14 +506,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundles[0].getProcessData());
+ String processName = bundles[0].getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA1:
- compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
- compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
//UA2:
compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -559,14 +559,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundle.getProcessData());
+ String processName = bundle.getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA1:
- compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
- compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
//UA2:
compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -672,7 +672,7 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundles[0].getProcessData());
+ String processName = bundles[0].getProcessName();
//prism:
compareDataStoresForEquality(initialPrismStore, finalPrismStore);
compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
@@ -694,13 +694,13 @@ public class PrismProcessDeleteTest extends BaseTestClass {
compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
- compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+ compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName);
compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
- clusterName);
+ processName);
- compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+ compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName);
compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
- clusterName);
+ processName);
} catch (Exception e) {
e.printStackTrace();
@@ -752,18 +752,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundle.getProcessData());
+ String processName = bundle.getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA1:
compareDataStoresForEquality(initialUA1Store, finalUA1Store);
compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
//UA2:
- compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
- compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA2Store, finalUA2Store, processName);
+ compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName);
} catch (Exception e) {
e.printStackTrace();
throw new TestNGException(e.getMessage());
@@ -817,18 +817,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundle.getProcessData());
+ String processName = bundle.getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA1:
compareDataStoresForEquality(initialUA1Store, finalUA1Store);
compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
//UA2:
- compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
- compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA2Store, finalUA2Store, processName);
+ compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName);
} catch (Exception e) {
e.printStackTrace();
throw new TestNGException(e.getMessage());
@@ -876,18 +876,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
//now ensure that data has been deleted from all cluster store and is present in the
// cluster archives
- String clusterName = Util.readEntityName(bundles[1].getProcessData());
+ String processName = bundles[1].getProcessName();
//prism:
- compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
- compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+ compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
//UA1:
compareDataStoresForEquality(initialUA1Store, finalUA1Store);
compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
//UA2:
- compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
- compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+ compareDataStoreStates(initialUA2Store, finalUA2Store, processName);
+ compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName);
Util.startService(cluster2.getClusterHelper());
@@ -897,18 +897,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
- clusterName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
- compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+ compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName);
compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
- clusterName);
+ processName);
- compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+ compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName);
compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
- clusterName);
+ processName);
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
index 03f380d..4555221 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
@@ -335,9 +335,9 @@ public class PrismProcessScheduleTest extends BaseTestClass {
InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
- Util.readEntityName(bundles[0].getProcessData()), 0);
+ bundles[0].getProcessName(), 0);
InstanceUtil.waitTillInstanceReachState(cluster1OC,
- Util.readEntityName(bundles[0].getProcessData()), 2,
+ bundles[0].getProcessName(), 2,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
InstanceUtil.waitForBundleToReachState(cluster1,
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
index dfb405f..4aa7189 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
@@ -179,7 +179,7 @@ public class PrismProcessSnSTest extends BaseTestClass {
//reschedule trial
AssertUtil.assertSucceeded(cluster2.getProcessHelper().schedule(bundles[0].getProcessData()));
Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+ bundles[0].getProcessName(), EntityType.PROCESS).size(), 1);
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
index 7bc4b5b..f90a76b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
@@ -18,7 +18,6 @@
package org.apache.falcon.regression.prism;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
@@ -160,8 +159,7 @@ public class PrismSubmitTest extends BaseTestClass {
List<String> afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
- AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
- new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1);
AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
Util.startService(cluster1.getClusterHelper());
@@ -179,8 +177,7 @@ public class PrismSubmitTest extends BaseTestClass {
afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
- AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
- new ProcessMerlin(bundles[0].getProcessData()).getName(), -1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), -1);
AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
}
@@ -219,10 +216,8 @@ public class PrismSubmitTest extends BaseTestClass {
afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
- AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
- new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
- AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
- new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, bundles[0].getProcessName(), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1);
AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
}
@@ -566,8 +561,7 @@ public class PrismSubmitTest extends BaseTestClass {
afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
- AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
- new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1);
AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
index 272ac3b..f407601 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
@@ -18,7 +18,6 @@
package org.apache.falcon.regression.prism;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Property;
@@ -112,8 +111,7 @@ public class ProcessPartitionExpVariableTest extends BaseTestClass {
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, baseTestDir
+ "/input1/", dataDates);
- InstanceUtil.waitTillInstanceReachState(clusterOC,
- new ProcessMerlin(bundles[0].getProcessData()).getName(), 2,
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
index 1d65d12..2a73538 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
@@ -23,7 +23,6 @@ import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
@@ -76,8 +75,9 @@ public class RescheduleKilledProcessTest extends BaseTestClass {
public void rescheduleKilledProcess() throws Exception {
String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
String processEndTime = TimeUtil.getTimeWrtSystemTime(6);
- ProcessMerlin process = new ProcessMerlin(InstanceUtil.setProcessName(bundles[0].getProcessData(),
- this.getClass().getSimpleName() + "zeroInputProcess" + new Random().nextInt()));
+ ProcessMerlin process = bundles[0].getProcessObject();
+ process.setName(this.getClass().getSimpleName() + "-zeroInputProcess"
+ + new Random().nextInt());
List<String> feed = new ArrayList<String>();
feed.add(bundles[0].getOutputFeedFromBundle());
process.setProcessFeeds(feed, 0, 0, 1);
@@ -87,7 +87,8 @@ public class RescheduleKilledProcessTest extends BaseTestClass {
new ProcessMerlin.ProcessClusterBuilder(
Util.readEntityName(bundles[0].getClusters().get(0)))
.withValidity(processStartTime, processEndTime)
- .build());
+ .build()
+ );
bundles[0].setProcessData(process.toString());
bundles[0].submitFeedsScheduleProcess(prism);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
index 7e4422b..8d7ac7f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
@@ -29,7 +29,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -159,7 +158,7 @@ public class RescheduleProcessInFinalStatesTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ .getProcessInstanceKill(bundles[0].getProcessName(),
"?start=2010-01-02T01:05Z&end=2010-01-02T01:11Z");
InstanceUtil
.waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.DONEWITHERROR);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index 0cc0d6e..14d76af 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -126,9 +126,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
//update frequency
- Frequency f = new Frequency("" + 21, Frequency.TimeUnit.minutes);
- String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
- ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed, "abc", null);
+ FeedMerlin updatedFeed = new FeedMerlin(feed);
+ updatedFeed.setFrequency(new Frequency("21", Frequency.TimeUnit.minutes));
+ ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed.toString(), "abc", null);
Assert.assertTrue(r.getMessage()
.contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
}
@@ -182,9 +182,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
//update frequency
- Frequency f = new Frequency("" + 7, Frequency.TimeUnit.minutes);
- String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
- r = prism.getFeedHelper().update(feed, updatedFeed,
+ FeedMerlin updatedFeed = new FeedMerlin(feed);
+ updatedFeed.setFrequency(new Frequency("7", Frequency.TimeUnit.minutes));
+ r = prism.getFeedHelper().update(feed, updatedFeed.toString(),
TimeUtil.getTimeWrtSystemTime(-10000), null);
AssertUtil.assertSucceeded(r);
InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
index 4c54409..040057e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
@@ -141,7 +141,7 @@ public class ProcessUITest extends BaseUITestClass {
Generator.getHadoopPathGenerator(feedInputPath, MINUTE_DATE_PATTERN));
int j = 0;
for (FeedMerlin feed : inputFeeds) {
- bundles[0].addInputFeedToBundle("inputFeed" + j, feed.toString(), j++);
+ bundles[0].addInputFeedToBundle("inputFeed" + j++, feed);
}
outputFeeds = LineageApiTest.generateFeeds(numOutputFeeds, outputMerlin,
@@ -150,7 +150,7 @@ public class ProcessUITest extends BaseUITestClass {
Generator.getHadoopPathGenerator(feedOutputPath, MINUTE_DATE_PATTERN));
j = 0;
for (FeedMerlin feed : outputFeeds) {
- bundles[0].addOutputFeedToBundle("outputFeed" + j, feed.toString(), j++);
+ bundles[0].addOutputFeedToBundle("outputFeed" + j++, feed);
}
AssertUtil.assertSucceeded(bundles[0].submitBundle(prism));
[3/3] falcon git commit: FALCON-1135 Migrate methods related to
*Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by
Ruslan Ostafiychuk
Posted by ro...@apache.org.
FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by Ruslan Ostafiychuk
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/395675fb
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/395675fb
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/395675fb
Branch: refs/heads/master
Commit: 395675fb0f4096dc44f1e325681536328956440e
Parents: 8c7eaa6
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Tue Feb 3 15:18:27 2015 +0200
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Thu Apr 9 16:22:05 2015 +0300
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 2 +
.../regression/Entities/ClusterMerlin.java | 26 ++
.../falcon/regression/Entities/FeedMerlin.java | 46 ++++
.../regression/Entities/ProcessMerlin.java | 145 +++++++++++
.../falcon/regression/core/bundle/Bundle.java | 232 +++---------------
.../falcon/regression/core/util/BundleUtil.java | 2 +-
.../regression/core/util/InstanceUtil.java | 102 --------
.../falcon/regression/AuthorizationTest.java | 126 +++++-----
.../regression/ELExpCurrentAndLastWeekTest.java | 3 +-
.../falcon/regression/ELValidationsTest.java | 3 +-
.../falcon/regression/ExternalFSTest.java | 24 +-
.../regression/FeedInstanceListingTest.java | 2 +-
.../falcon/regression/FeedLateRerunTest.java | 20 +-
.../falcon/regression/FeedReplicationTest.java | 162 ++++++-------
.../apache/falcon/regression/LogMoverTest.java | 10 +-
.../apache/falcon/regression/NewRetryTest.java | 59 ++---
.../falcon/regression/NoOutputProcessTest.java | 3 +-
.../falcon/regression/ProcessFrequencyTest.java | 3 +-
.../ProcessInstanceColoMixedTest.java | 27 +--
.../regression/ProcessInstanceKillsTest.java | 3 +-
.../regression/ProcessInstanceResumeTest.java | 3 +-
.../regression/ProcessInstanceRunningTest.java | 3 +-
.../regression/ProcessInstanceStatusTest.java | 3 +-
.../regression/ProcessInstanceSuspendTest.java | 8 +-
.../falcon/regression/ProcessLateRerunTest.java | 19 +-
.../falcon/regression/ProcessSLATest.java | 10 +-
.../ValidateAPIPrismAndServerTest.java | 4 +-
.../entity/EntitiesPatternSearchTest.java | 12 +-
.../regression/entity/ListEntitiesTest.java | 2 +-
.../falcon/regression/hcat/HCatProcessTest.java | 11 +-
.../prism/NewPrismProcessUpdateTest.java | 151 +++++-------
.../regression/prism/OptionalInputTest.java | 45 ++--
.../PrismFeedReplicationPartitionExpTest.java | 240 +++++++++----------
.../prism/PrismFeedReplicationUpdateTest.java | 52 ++--
.../regression/prism/PrismFeedUpdateTest.java | 8 +-
.../prism/PrismProcessDeleteTest.java | 110 ++++-----
.../prism/PrismProcessScheduleTest.java | 4 +-
.../regression/prism/PrismProcessSnSTest.java | 2 +-
.../regression/prism/PrismSubmitTest.java | 16 +-
.../prism/ProcessPartitionExpVariableTest.java | 4 +-
.../prism/RescheduleKilledProcessTest.java | 9 +-
.../RescheduleProcessInFinalStatesTest.java | 3 +-
.../prism/UpdateAtSpecificTimeTest.java | 12 +-
.../falcon/regression/ui/ProcessUITest.java | 4 +-
44 files changed, 808 insertions(+), 927 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 2d0e4c3..76c1090 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -63,6 +63,8 @@ Trunk (Unreleased)
via Samarth Gupta)
IMPROVEMENTS
+ FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and
+ Bundle.java (Ruslan Ostafiychuk)
FALCON-1088 Fixing FeedDelayParallelTimeoutTest and renaming it to FeedDelayTest(Pragya M via
Samarth G)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
index 22ec5da..a99b307 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
@@ -23,11 +23,17 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.ACL;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfaces;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Location;
import org.testng.Assert;
import javax.xml.bind.JAXBException;
import java.io.StringWriter;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** Class for representing a cluster xml. */
@@ -78,4 +84,24 @@ public class ClusterMerlin extends Cluster {
acl.setPermission(permission);
this.setACL(acl);
}
+
+ public void setInterface(Interfacetype interfacetype, String value) {
+ final Interfaces interfaces = this.getInterfaces();
+ final List<Interface> interfaceList = interfaces.getInterfaces();
+ for (final Interface anInterface : interfaceList) {
+ if (anInterface.getType() == interfacetype) {
+ anInterface.setEndpoint(value);
+ }
+ }
+ }
+
+ public void setWorkingLocationPath(String path) {
+ for (Location location : getLocations().getLocations()) {
+ if (location.getName() == ClusterLocationType.WORKING) {
+ location.setPath(path);
+ break;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
index 70e2e73..1b59227 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
@@ -328,4 +328,50 @@ public class FeedMerlin extends Feed {
this.setSla(sla);
}
+ /**
+ * Sets new feed data path (for first location).
+ *
+ * @param path new feed data path
+ */
+ public void setFilePath(String path) {
+ getLocations().getLocations().get(0).setPath(path);
+ }
+
+
+ /**
+ * Retrieves prefix (main sub-folders) of first feed data path.
+ */
+ public String getFeedPrefix() {
+ String path = getLocations().getLocations().get(0).getPath();
+ return path.substring(0, path.indexOf('$'));
+ }
+
+ public void setValidity(String feedStart, String feedEnd) {
+ this.getClusters().getClusters().get(0).getValidity()
+ .setStart(TimeUtil.oozieDateToDate(feedStart).toDate());
+ this.getClusters().getClusters().get(0).getValidity()
+ .setEnd(TimeUtil.oozieDateToDate(feedEnd).toDate());
+
+ }
+
+ public void setDataLocationPath(String path) {
+ final List<Location> locations = this.getLocations().getLocations();
+ for (Location location : locations) {
+ if (location.getType() == LocationType.DATA) {
+ location.setPath(path);
+ }
+ }
+ }
+
+ public void setPeriodicity(int frequency, Frequency.TimeUnit periodicity) {
+ Frequency frq = new Frequency(String.valueOf(frequency), periodicity);
+ this.setFrequency(frq);
+ }
+
+ public void setTableUri(String tableUri) {
+ final CatalogTable catalogTable = new CatalogTable();
+ catalogTable.setUri(tableUri);
+ this.setTable(catalogTable);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index 01fdd04..8732a44 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Sla;
import org.apache.falcon.entity.v0.process.ACL;
import org.apache.falcon.entity.v0.process.Cluster;
@@ -35,6 +36,7 @@ import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Properties;
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.testng.Assert;
@@ -268,6 +270,149 @@ public class ProcessMerlin extends Process {
this.setSla(sla);
}
+ /**
+ * Sets new process validity on all the process clusters.
+ *
+ * @param startTime start of process validity
+ * @param endTime end of process validity
+ */
+ public void setValidity(String startTime, String endTime) {
+
+ for (Cluster cluster : this.getClusters().getClusters()) {
+ cluster.getValidity().setStart(TimeUtil.oozieDateToDate(startTime).toDate());
+ cluster.getValidity().setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
+ }
+ }
+
+ /**
+ * Adds one output into process.
+ */
+ public void addOutputFeed(String outputName, String feedName) {
+ Output out1 = getOutputs().getOutputs().get(0);
+ Output out2 = new Output();
+ out2.setFeed(feedName);
+ out2.setName(outputName);
+ out2.setInstance(out1.getInstance());
+ getOutputs().getOutputs().add(out2);
+ }
+
+
+
+ /**
+ * Adds one input into process.
+ */
+ public void addInputFeed(String inputName, String feedName) {
+ Input in1 = getInputs().getInputs().get(0);
+ Input in2 = new Input();
+ in2.setEnd(in1.getEnd());
+ in2.setFeed(feedName);
+ in2.setName(inputName);
+ in2.setPartition(in1.getPartition());
+ in2.setStart(in1.getStart());
+ in2.setOptional(in1.isOptional());
+ getInputs().getInputs().add(in2);
+ }
+
+
+ public void setInputFeedWithEl(String inputFeedName, String startEl, String endEl) {
+ Inputs inputs = new Inputs();
+ Input input = new Input();
+ input.setFeed(inputFeedName);
+ input.setStart(startEl);
+ input.setEnd(endEl);
+ input.setName("inputData");
+ inputs.getInputs().add(input);
+ this.setInputs(inputs);
+ }
+
+ public void setDatasetInstances(String startInstance, String endInstance) {
+ this.getInputs().getInputs().get(0).setStart(startInstance);
+ this.getInputs().getInputs().get(0).setEnd(endInstance);
+ }
+
+ public void setProcessInputStartEnd(String start, String end) {
+ for (Input input : this.getInputs().getInputs()) {
+ input.setStart(start);
+ input.setEnd(end);
+ }
+ }
+
+ /**
+ * Sets name(s) of the process output(s).
+ *
+ * @param names new names of the outputs
+ */
+ public void setOutputNames(String... names) {
+ Outputs outputs = this.getOutputs();
+ Assert.assertEquals(outputs.getOutputs().size(), names.length,
+ "Number of output names is not equal to number of outputs in process");
+ for (int i = 0; i < names.length; i++) {
+ outputs.getOutputs().get(i).setName(names[i]);
+ }
+ this.setOutputs(outputs);
+ }
+
+
+ /**
+ * Sets partition for each input, according to number of supplied partitions.
+ *
+ * @param partition partitions to be set
+ */
+ public void setInputPartition(String... partition) {
+ for (int i = 0; i < partition.length; i++) {
+ this.getInputs().getInputs().get(i).setPartition(partition[i]);
+ }
+ }
+
+ /**
+ * Adds optional property to process definition.
+ *
+ * @param properties desired properties to be added
+ */
+ public void addProperties(Property... properties) {
+ for (Property property : properties) {
+ this.getProperties().getProperties().add(property);
+ }
+ }
+
+ /**
+ * Changes names of process inputs.
+ *
+ * @param names desired names of inputs
+ */
+ public void setInputNames(String... names) {
+ for (int i = 0; i < names.length; i++) {
+ this.getInputs().getInputs().get(i).setName(names[i]);
+ }
+ }
+
+ public void setPeriodicity(int frequency, Frequency.TimeUnit periodicity) {
+ Frequency frq = new Frequency(String.valueOf(frequency), periodicity);
+ this.setFrequency(frq);
+ }
+
+ public void setTimeOut(int magnitude, Frequency.TimeUnit unit) {
+ Frequency frq = new Frequency(String.valueOf(magnitude), unit);
+ this.setTimeout(frq);
+ }
+
+
+
+ public void setWorkflow(String wfPath, String libPath, EngineType engineType) {
+ Workflow w = this.getWorkflow();
+ if (engineType != null) {
+ w.setEngine(engineType);
+ }
+ if (libPath != null) {
+ w.setLib(libPath);
+ }
+ w.setPath(wfPath);
+ this.setWorkflow(w);
+ }
+
+ public String getFirstInputName() {
+ return getInputs().getInputs().get(0).getName();
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index b0fa0a5..2f43972 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -21,24 +21,14 @@ package org.apache.falcon.regression.core.bundle;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfaces;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.entity.v0.process.Retry;
-import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.regression.Entities.ClusterMerlin;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.Entities.ProcessMerlin;
@@ -50,9 +40,6 @@ import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import org.testng.Assert;
import javax.xml.bind.JAXBException;
@@ -228,7 +215,7 @@ public class Bundle {
*/
public void generateUniqueBundle(String prefix) {
/* creating new names */
- List<ClusterMerlin> clusterMerlinList = BundleUtil.fromString(clusters);
+ List<ClusterMerlin> clusterMerlinList = BundleUtil.getClustersFromStrings(clusters);
Map<String, String> clusterNameMap = new HashMap<String, String>();
for (ClusterMerlin clusterMerlin : clusterMerlinList) {
clusterNameMap.putAll(clusterMerlin.setUniqueName(prefix));
@@ -309,14 +296,7 @@ public class Bundle {
*/
public void setProcessInput(String startEl, String endEl) {
ProcessMerlin process = getProcessObject();
- Inputs inputs = new Inputs();
- Input input = new Input();
- input.setFeed(Util.readEntityName(getInputFeedFromBundle()));
- input.setStart(startEl);
- input.setEnd(endEl);
- input.setName("inputData");
- inputs.getInputs().add(input);
- process.setInputs(inputs);
+ process.setInputFeedWithEl(Util.readEntityName(getInputFeedFromBundle()), startEl, endEl);
this.setProcessData(process.toString());
}
@@ -342,10 +322,7 @@ public class Bundle {
public void setFeedValidity(String feedStart, String feedEnd, String feedName) {
FeedMerlin feedElement = getFeedElement(feedName);
- feedElement.getClusters().getClusters().get(0).getValidity()
- .setStart(TimeUtil.oozieDateToDate(feedStart).toDate());
- feedElement.getClusters().getClusters().get(0).getValidity()
- .setEnd(TimeUtil.oozieDateToDate(feedEnd).toDate());
+ feedElement.setValidity(feedStart, feedEnd);
writeFeedElement(feedElement, feedName);
}
@@ -377,24 +354,19 @@ public class Bundle {
public void setDatasetInstances(String startInstance, String endInstance) {
ProcessMerlin processElement = getProcessObject();
- processElement.getInputs().getInputs().get(0).setStart(startInstance);
- processElement.getInputs().getInputs().get(0).setEnd(endInstance);
+ processElement.setDatasetInstances(startInstance, endInstance);
setProcessData(processElement.toString());
}
public void setProcessPeriodicity(int frequency, TimeUnit periodicity) {
ProcessMerlin processElement = getProcessObject();
- Frequency frq = new Frequency("" + frequency, periodicity);
- processElement.setFrequency(frq);
+ processElement.setPeriodicity(frequency, periodicity);
setProcessData(processElement.toString());
}
public void setProcessInputStartEnd(String start, String end) {
ProcessMerlin processElement = getProcessObject();
- for (Input input : processElement.getInputs().getInputs()) {
- input.setStart(start);
- input.setEnd(end);
- }
+ processElement.setProcessInputStartEnd(start, end);
setProcessData(processElement.toString());
}
@@ -421,23 +393,10 @@ public class Bundle {
}
public void setOutputFeedLocationData(String path) {
- ProcessMerlin processElement = new ProcessMerlin(processData);
- String outputDataset = null;
- int datasetIndex;
- for (datasetIndex = 0; datasetIndex < dataSets.size(); datasetIndex++) {
- outputDataset = dataSets.get(datasetIndex);
- if (outputDataset.contains(processElement.getOutputs().getOutputs().get(0).getFeed())) {
- break;
- }
- }
-
- FeedMerlin feedElement = new FeedMerlin(outputDataset);
- Location l = new Location();
- l.setPath(path);
- l.setType(LocationType.DATA);
- feedElement.getLocations().getLocations().set(0, l);
- dataSets.set(datasetIndex, feedElement.toString());
- LOGGER.info("modified location path dataSet is: " + dataSets.get(datasetIndex));
+ FeedMerlin feedElement = getFeedElement(getOutputFeedNameFromBundle());
+ feedElement.setDataLocationPath(path);
+ writeFeedElement(feedElement, feedElement.getName());
+ LOGGER.info("modified location path dataSet is: " + feedElement);
}
public void setProcessConcurrency(int concurrency) {
@@ -456,15 +415,7 @@ public class Bundle {
public void setProcessWorkflow(String wfPath, String libPath, EngineType engineType) {
ProcessMerlin processElement = getProcessObject();
- Workflow w = processElement.getWorkflow();
- if (engineType != null) {
- w.setEngine(engineType);
- }
- if (libPath != null) {
- w.setLib(libPath);
- }
- w.setPath(wfPath);
- processElement.setWorkflow(w);
+ processElement.setWorkflow(wfPath, libPath, engineType);
setProcessData(processElement.toString());
}
@@ -501,8 +452,7 @@ public class Bundle {
public void setInputFeedPeriodicity(int frequency, TimeUnit periodicity) {
String feedName = getInputFeedNameFromBundle();
FeedMerlin feedElement = getFeedElement(feedName);
- Frequency frq = new Frequency("" + frequency, periodicity);
- feedElement.setFrequency(frq);
+ feedElement.setPeriodicity(frequency, periodicity);
writeFeedElement(feedElement, feedName);
}
@@ -520,12 +470,7 @@ public class Bundle {
public void setInputFeedDataPath(String path) {
String feedName = getInputFeedNameFromBundle();
FeedMerlin feedElement = getFeedElement(feedName);
- final List<Location> locations = feedElement.getLocations().getLocations();
- for (Location location : locations) {
- if (location.getType() == LocationType.DATA) {
- locations.get(0).setPath(path);
- }
- }
+ feedElement.setDataLocationPath(path);
writeFeedElement(feedElement, feedName);
}
@@ -536,40 +481,9 @@ public class Bundle {
.getPath());
}
- public void setProcessValidity(DateTime startDate, DateTime endDate) {
-
- DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd/HH:mm");
-
- String start = formatter.print(startDate).replace("/", "T") + "Z";
- String end = formatter.print(endDate).replace("/", "T") + "Z";
-
- ProcessMerlin processElement = new ProcessMerlin(processData);
-
- for (Cluster cluster : processElement.getClusters().getClusters()) {
-
- org.apache.falcon.entity.v0.process.Validity validity =
- new org.apache.falcon.entity.v0.process.Validity();
- validity.setStart(TimeUtil.oozieDateToDate(start).toDate());
- validity.setEnd(TimeUtil.oozieDateToDate(end).toDate());
- cluster.setValidity(validity);
-
- }
-
- processData = processElement.toString();
- }
-
public void setProcessValidity(String startDate, String endDate) {
ProcessMerlin processElement = new ProcessMerlin(processData);
-
- for (Cluster cluster : processElement.getClusters().getClusters()) {
- org.apache.falcon.entity.v0.process.Validity validity =
- new org.apache.falcon.entity.v0.process.Validity();
- validity.setStart(TimeUtil.oozieDateToDate(startDate).toDate());
- validity.setEnd(TimeUtil.oozieDateToDate(endDate).toDate());
- cluster.setValidity(validity);
-
- }
-
+ processElement.setValidity(startDate, endDate);
processData = processElement.toString();
}
@@ -603,16 +517,9 @@ public class Bundle {
}
}
- public void addProcessInput(String feed, String feedName) {
+ public void addProcessInput(String inputName, String feedName) {
ProcessMerlin processElement = getProcessObject();
- Input in1 = processElement.getInputs().getInputs().get(0);
- Input in2 = new Input();
- in2.setEnd(in1.getEnd());
- in2.setFeed(feed);
- in2.setName(feedName);
- in2.setPartition(in1.getPartition());
- in2.setStart(in1.getStart());
- processElement.getInputs().getInputs().add(in2);
+ processElement.addInputFeed(inputName, feedName);
setProcessData(processElement.toString());
}
@@ -654,43 +561,27 @@ public class Bundle {
public void setClusterInterface(Interfacetype interfacetype, String value) {
ClusterMerlin c = getClusterElement();
- final Interfaces interfaces = c.getInterfaces();
- final List<Interface> interfaceList = interfaces.getInterfaces();
- for (final Interface anInterface : interfaceList) {
- if (anInterface.getType() == interfacetype) {
- anInterface.setEndpoint(value);
- }
- }
+ c.setInterface(interfacetype, value);
writeClusterElement(c);
}
public void setInputFeedTableUri(String tableUri) {
final String feedStr = getInputFeedFromBundle();
FeedMerlin feed = new FeedMerlin(feedStr);
- final CatalogTable catalogTable = new CatalogTable();
- catalogTable.setUri(tableUri);
- feed.setTable(catalogTable);
+ feed.setTableUri(tableUri);
writeFeedElement(feed, feed.getName());
}
public void setOutputFeedTableUri(String tableUri) {
final String feedStr = getOutputFeedFromBundle();
FeedMerlin feed = new FeedMerlin(feedStr);
- final CatalogTable catalogTable = new CatalogTable();
- catalogTable.setUri(tableUri);
- feed.setTable(catalogTable);
+ feed.setTableUri(tableUri);
writeFeedElement(feed, feed.getName());
}
public void setCLusterWorkingPath(String clusterData, String path) {
ClusterMerlin c = new ClusterMerlin(clusterData);
- for (int i = 0; i < c.getLocations().getLocations().size(); i++) {
- if (c.getLocations().getLocations().get(i).getName().equals(ClusterLocationType.WORKING)) {
- c.getLocations().getLocations().get(i).setPath(path);
- }
- }
-
- //this.setClusterData(clusterData)
+ c.setWorkingLocationPath(path);
writeClusterElement(c);
}
@@ -795,17 +686,13 @@ public class Bundle {
public void setProcessLibPath(String libPath) {
ProcessMerlin processElement = getProcessObject();
- Workflow wf = processElement.getWorkflow();
- wf.setLib(libPath);
- processElement.setWorkflow(wf);
+ processElement.getWorkflow().setLib(libPath);
setProcessData(processElement.toString());
-
}
public void setProcessTimeOut(int magnitude, TimeUnit unit) {
ProcessMerlin processElement = getProcessObject();
- Frequency frq = new Frequency("" + magnitude, unit);
- processElement.setTimeout(frq);
+ processElement.setTimeOut(magnitude, unit);
setProcessData(processElement.toString());
}
@@ -901,9 +788,7 @@ public class Bundle {
*/
public void setProcessInputNames(String... names) {
ProcessMerlin p = new ProcessMerlin(processData);
- for (int i = 0; i < names.length; i++) {
- p.getInputs().getInputs().get(i).setName(names[i]);
- }
+ p.setInputNames(names);
processData = p.toString();
}
@@ -914,9 +799,7 @@ public class Bundle {
*/
public void addProcessProperty(Property... properties) {
ProcessMerlin p = new ProcessMerlin(processData);
- for (Property property : properties) {
- p.getProperties().getProperties().add(property);
- }
+ p.addProperties(properties);
processData = p.toString();
}
@@ -927,9 +810,7 @@ public class Bundle {
*/
public void setProcessInputPartition(String... partition) {
ProcessMerlin p = new ProcessMerlin(processData);
- for (int i = 0; i < partition.length; i++) {
- p.getInputs().getInputs().get(i).setPartition(partition[i]);
- }
+ p.setInputPartition(partition);
processData = p.toString();
}
@@ -940,46 +821,23 @@ public class Bundle {
*/
public void setProcessOutputNames(String... names) {
ProcessMerlin p = new ProcessMerlin(processData);
- Outputs outputs = p.getOutputs();
- Assert.assertEquals(outputs.getOutputs().size(), names.length,
- "Number of output names is not equal to number of outputs in process");
- for (int i = 0; i < names.length; i++) {
- outputs.getOutputs().get(i).setName(names[i]);
- }
- p.setOutputs(outputs);
+ p.setOutputNames(names);
processData = p.toString();
}
- public void addInputFeedToBundle(String feedRefName, String feed, int templateInputIdx) {
- this.getDataSets().add(feed);
- String feedName = Util.readEntityName(feed);
- String vProcessData = getProcessData();
-
- ProcessMerlin processObject = new ProcessMerlin(vProcessData);
- final List<Input> processInputs = processObject.getInputs().getInputs();
- Input templateInput = processInputs.get(templateInputIdx);
- Input newInput = new Input();
- newInput.setFeed(feedName);
- newInput.setName(feedRefName);
- newInput.setOptional(templateInput.isOptional());
- newInput.setStart(templateInput.getStart());
- newInput.setEnd(templateInput.getEnd());
- newInput.setPartition(templateInput.getPartition());
- processInputs.add(newInput);
+ public void addInputFeedToBundle(String feedRefName, Feed feed) {
+ this.getDataSets().add(feed.toString());
+
+ ProcessMerlin processObject = new ProcessMerlin(processData);
+ processObject.addInputFeed(feedRefName, feed.getName());
setProcessData(processObject.toString());
}
- public void addOutputFeedToBundle(String feedRefName, String feed, int templateOutputIdx) {
- this.getDataSets().add(feed);
- String feedName = Util.readEntityName(feed);
+ public void addOutputFeedToBundle(String feedRefName, Feed feed) {
+ this.getDataSets().add(feed.toString());
+
ProcessMerlin processObject = getProcessObject();
- final List<Output> processOutputs = processObject.getOutputs().getOutputs();
- Output templateOutput = processOutputs.get(templateOutputIdx);
- Output newOutput = new Output();
- newOutput.setFeed(feedName);
- newOutput.setName(feedRefName);
- newOutput.setInstance(templateOutput.getInstance());
- processOutputs.add(newOutput);
+ processObject.addOutputFeed(feedRefName, feed.getName());
setProcessData(processObject.toString());
}
@@ -1000,26 +858,12 @@ public class Bundle {
public String getInputFeedFromBundle() {
ProcessMerlin processObject = new ProcessMerlin(getProcessData());
- for (Input input : processObject.getInputs().getInputs()) {
- for (String feed : getDataSets()) {
- if (Util.readEntityName(feed).equalsIgnoreCase(input.getFeed())) {
- return feed;
- }
- }
- }
- return null;
+ return getFeed(processObject.getInputs().getInputs().get(0).getFeed());
}
public String getOutputFeedFromBundle() {
ProcessMerlin processObject = new ProcessMerlin(getProcessData());
- for (Output output : processObject.getOutputs().getOutputs()) {
- for (String feed : getDataSets()) {
- if (Util.readEntityName(feed).equalsIgnoreCase(output.getFeed())) {
- return feed;
- }
- }
- }
- return null;
+ return getFeed(processObject.getOutputs().getOutputs().get(0).getFeed());
}
public String getOutputFeedNameFromBundle() {
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
index 0b2c4e1..a825845 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -237,7 +237,7 @@ public final class BundleUtil {
return property;
}
- public static List<ClusterMerlin> fromString(List<String> clusterStrings) {
+ public static List<ClusterMerlin> getClustersFromStrings(List<String> clusterStrings) {
List<ClusterMerlin> clusters = new ArrayList<ClusterMerlin>();
for (String clusterString : clusterStrings) {
clusters.add(new ClusterMerlin(clusterString));
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
index 4620787..6c90256 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -26,10 +26,6 @@ import com.google.gson.JsonSyntaxException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
import org.apache.falcon.regression.core.helpers.ColoHelper;
@@ -527,37 +523,6 @@ public final class InstanceUtil {
return InstanceUtil.sendRequestProcessInstance(url, user);
}
- /**
- * Retrieves prefix (main sub-folders) of feed data path.
- */
- public static String getFeedPrefix(String feed) {
- FeedMerlin feedElement = new FeedMerlin(feed);
- String locationPath = feedElement.getLocations().getLocations().get(0).getPath();
- locationPath = locationPath.substring(0, locationPath.indexOf('$'));
- return locationPath;
- }
-
- /**
- * Adds one input into process.
- *
- * @param process - where input should be inserted
- * @param feed - feed which will be used as input feed
- * @return - string representation of process definition
- */
- public static String addProcessInputFeed(String process, String feed, String feedName) {
-
- ProcessMerlin processElement = new ProcessMerlin(process);
- Input in1 = processElement.getInputs().getInputs().get(0);
- Input in2 = new Input();
- in2.setEnd(in1.getEnd());
- in2.setFeed(feed);
- in2.setName(feedName);
- in2.setPartition(in1.getPartition());
- in2.setStart(in1.getStart());
- processElement.getInputs().getInputs().add(in2);
- return processElement.toString();
- }
-
public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord(
ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
@@ -598,21 +563,6 @@ public final class InstanceUtil {
return actionInfo.getRun();
}
-
- /**
- * Sets new feed data path.
- *
- * @param feed feed which is to be modified
- * @param path new feed data path
- * @return modified feed
- */
- public static String setFeedFilePath(String feed, String path) {
-
- FeedMerlin feedElement = new FeedMerlin(feed);
- feedElement.getLocations().getLocations().get(0).setPath(path);
- return feedElement.toString();
- }
-
public static int checkIfFeedCoordExist(AbstractEntityHelper helper,
String feedName, String coordType) throws OozieClientException {
LOGGER.info("feedName: " + feedName);
@@ -640,47 +590,6 @@ public final class InstanceUtil {
return numberOfCoord;
}
- /**
- * Sets process frequency.
- *
- * @return modified process definition
- */
- public static String setProcessFrequency(String process, Frequency frequency) {
- ProcessMerlin p = new ProcessMerlin(process);
- p.setFrequency(frequency);
- return p.toString();
- }
-
- /**
- * Sets new process name.
- */
- public static String setProcessName(String process, String newName) {
- ProcessMerlin p = new ProcessMerlin(process);
- p.setName(newName);
- return p.toString();
- }
-
- /**
- * Sets new process validity on all the process clusters.
- *
- * @param process process entity to be modified
- * @param startTime start of process validity
- * @param endTime end of process validity
- * @return modified process definition
- */
- public static String setProcessValidity(String process,
- String startTime, String endTime) {
- ProcessMerlin processElement = new ProcessMerlin(process);
-
- for (int i = 0; i < processElement.getClusters().getClusters().size(); i++) {
- processElement.getClusters().getClusters().get(i).getValidity().setStart(
- TimeUtil.oozieDateToDate(startTime).toDate());
- processElement.getClusters().getClusters().get(i).getValidity()
- .setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
- }
- return processElement.toString();
- }
-
public static List<CoordinatorAction> getProcessInstanceListFromAllBundles(
ColoHelper coloHelper, String processName, EntityType entityType)
throws OozieClientException {
@@ -950,17 +859,6 @@ public final class InstanceUtil {
}
/**
- * Sets feed frequency.
- *
- * @return modified feed
- */
- public static String setFeedFrequency(String feed, Frequency f) {
- FeedMerlin feedElement = new FeedMerlin(feed);
- feedElement.setFrequency(f);
- return feedElement.toString();
- }
-
- /**
* Waits till instances of specific job will be created during specific time.
* Use this method directly in unusual test cases where timeouts are different from trivial.
* In other cases use waitTillInstancesAreCreated(ColoHelper,String,int)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
index eaa69f0..02280f3 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
@@ -287,36 +287,32 @@ public class AuthorizationTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
//check that there are 3 running instances
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
//check that there are 2 waiting instances
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2,
+ CoordinatorAction.Status.WAITING, EntityType.PROCESS);
//3 instances should be running , other 2 should be waiting
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
- .readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + endTime);
InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
//suspend 3 running instances
- r = prism.getProcessHelper().getProcessInstanceSuspend(Util
- .readEntityName(bundles[0].getProcessData()),
+ r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + midTime);
InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
//try to resume suspended instances by U2
- r = prism.getProcessHelper().getProcessInstanceResume(Util.readEntityName(bundles[0]
- .getProcessData()), "?start=" + startTime + "&end=" + midTime,
- MerlinConstants.USER2_NAME);
+ r = prism.getProcessHelper().getProcessInstanceResume(bundles[0].getProcessName(), "?start=" + startTime
+ + "&end=" + midTime, MerlinConstants.USER2_NAME);
//the state of above 3 instances should still be suspended
InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
//check the status of all instances
- r = prism.getProcessHelper().getProcessInstanceStatus(Util
- .readEntityName(bundles[0].getProcessData()),
+ r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + endTime);
InstanceUtil.validateResponse(r, 5, 0, 3, 2, 0);
}
@@ -353,18 +349,16 @@ public class AuthorizationTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
//check that there are 3 running instances
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
//3 instances should be running , other 2 should be waiting
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
- .readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + endTime);
InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
//try to kill all instances by U2
- r = prism.getProcessHelper().getProcessInstanceKill(Util
- .readEntityName(bundles[0].getProcessData()),
+ r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME);
//number of instances should be the same as before
@@ -401,28 +395,25 @@ public class AuthorizationTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
//check that there are 3 running instances
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
//check that there are 2 waiting instances
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2,
+ CoordinatorAction.Status.WAITING, EntityType.PROCESS);
//3 instances should be running , other 2 should be waiting
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
- .readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + endTime);
InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
//suspend 3 running instances
- r = prism.getProcessHelper().getProcessInstanceSuspend(Util
- .readEntityName(bundles[0].getProcessData()),
+ r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + midTime);
InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
//try to kill all instances by U2
- r = prism.getProcessHelper().getProcessInstanceKill(Util
- .readEntityName(bundles[0].getProcessData()),
+ r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME);
//3 should still be suspended, 2 should be waiting
@@ -465,29 +456,24 @@ public class AuthorizationTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
//check that there are 4 running instances
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 4, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 4,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
//4 instances should be running , 1 should be waiting
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
- .readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
"?start=" + startTime + "&end=" + endTime);
InstanceUtil.validateResponse(r, 5, 4, 0, 1, 0);
//kill 3 running instances
- r = prism.getProcessHelper().getProcessInstanceKill(Util
- .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end="
- +
- midTime);
+ r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(),
+ "?start=" + startTime + "&end=" + midTime);
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
//generally 3 instances should be killed, 1 is running and 1 is waiting
//try to rerun instances by U2
- r = prism.getProcessHelper().getProcessInstanceRerun(Util
- .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end="
- +
- midTime, MerlinConstants.USER2_NAME);
+ r = prism.getProcessHelper().getProcessInstanceRerun(bundles[0].getProcessName(),
+ "?start=" + startTime + "&end=" + midTime, MerlinConstants.USER2_NAME);
//instances should still be killed
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
@@ -510,7 +496,7 @@ public class AuthorizationTest extends BaseTestClass {
Assert.assertTrue(definition.contains(feed.getName()) && !definition.contains("(feed) not found"),
"Feed should be already submitted");
//update feed definition
- FeedMerlin newFeed = new FeedMerlin(feed.toString());
+ FeedMerlin newFeed = new FeedMerlin(feed);
newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//try to update feed by U2
final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
@@ -589,8 +575,7 @@ public class AuthorizationTest extends BaseTestClass {
@Test(enabled = false)
public void u1ScheduleFeedU2ScheduleDependantProcessU1UpdateFeed() throws Exception {
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- String process = bundles[0].getProcessData();
- process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
//submit both feeds
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
@@ -600,13 +585,13 @@ public class AuthorizationTest extends BaseTestClass {
//by U2 schedule process dependant on scheduled feed by U1
ServiceResponse serviceResponse = prism.getProcessHelper()
- .submitAndSchedule(process, MerlinConstants.USER2_NAME);
+ .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME);
AssertUtil.assertSucceeded(serviceResponse);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
//get old process details
String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+ .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
String oldProcessUser =
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
@@ -615,7 +600,7 @@ public class AuthorizationTest extends BaseTestClass {
String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
//update feed definition
- FeedMerlin newFeed = new FeedMerlin(feed.toString());
+ FeedMerlin newFeed = new FeedMerlin(feed);
newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//update feed by U1
@@ -630,7 +615,7 @@ public class AuthorizationTest extends BaseTestClass {
Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same");
//new process bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+ OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
String newProcessUser =
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
@@ -641,8 +626,7 @@ public class AuthorizationTest extends BaseTestClass {
@Test(enabled = false)
public void u1ScheduleFeedU2ScheduleDependantProcessU2UpdateFeed() throws Exception {
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- String process = bundles[0].getProcessData();
- process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
//submit both feeds
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
@@ -651,18 +635,18 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING);
//by U2 schedule process dependent on scheduled feed by U1
- ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process,
+ ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData(),
MerlinConstants.USER2_NAME);
AssertUtil.assertSucceeded(serviceResponse);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
//update feed definition
- FeedMerlin newFeed = new FeedMerlin(feed.toString());
+ FeedMerlin newFeed = new FeedMerlin(feed);
newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//get old process details
String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+ .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
String oldProcessUser =
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
@@ -682,7 +666,7 @@ public class AuthorizationTest extends BaseTestClass {
Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser);
//new process bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+ OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
String newProcessUser =
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
@@ -693,8 +677,7 @@ public class AuthorizationTest extends BaseTestClass {
@Test(enabled = false)
public void u1ScheduleFeedU1ScheduleDependantProcessU1UpdateProcess() throws Exception {
String feed = bundles[0].getInputFeedFromBundle();
- String process = bundles[0].getProcessData();
- process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
//submit both feeds
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
@@ -703,13 +686,13 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
//by U1 schedule process dependent on scheduled feed by U1
- ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process);
+ ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData());
AssertUtil.assertSucceeded(serviceResponse);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
//get old process details
String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+ .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
String oldProcessUser =
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
@@ -718,16 +701,16 @@ public class AuthorizationTest extends BaseTestClass {
.getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
//update process by U1
- ProcessMerlin processObj = new ProcessMerlin(process);
+ ProcessMerlin processObj = bundles[0].getProcessObject();
processObj.setProperty("randomProp", "randomVal");
- serviceResponse = prism.getProcessHelper().update(process, processObj.toString());
+ serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString());
AssertUtil.assertSucceeded(serviceResponse);
//new feed bundle should not be created
OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
//new process bundle should be created by U1
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+ OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
String newProcessUser =
getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
@@ -738,8 +721,7 @@ public class AuthorizationTest extends BaseTestClass {
@Test(enabled = false)
public void u1ScheduleFeedU1ScheduleDependantProcessU2UpdateProcess() throws Exception {
String feed = bundles[0].getInputFeedFromBundle();
- String process = bundles[0].getProcessData();
- process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
//submit both feeds
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
@@ -748,13 +730,13 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
//by U1 schedule process dependent on scheduled feed by U1
- ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process);
+ ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData());
AssertUtil.assertSucceeded(serviceResponse);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
//get old process details
String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+ .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
String oldProcessUser =
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
@@ -763,9 +745,9 @@ public class AuthorizationTest extends BaseTestClass {
.getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
//update process by U2
- ProcessMerlin processObj = new ProcessMerlin(process);
+ ProcessMerlin processObj = bundles[0].getProcessObject();
processObj.setProperty("randomProp", "randomVal");
- serviceResponse = prism.getProcessHelper().update(process, processObj.toString(),
+ serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString(),
TimeUtil.getTimeWrtSystemTime(0), MerlinConstants.USER2_NAME);
AssertUtil.assertSucceeded(serviceResponse);
@@ -773,7 +755,7 @@ public class AuthorizationTest extends BaseTestClass {
OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
//new process bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+ OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
String newProcessUser =
getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
Assert.assertNotEquals(oldProcessUser, newProcessUser, "User should not be the same");
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
index b7eb77f..33b0e77 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
@@ -18,7 +18,6 @@
package org.apache.falcon.regression;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -149,7 +148,7 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
private List<String> getMissingDependencies(ColoHelper prismHelper, Bundle bundle) throws OozieClientException {
List<String> bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS);
+ bundle.getProcessName(), EntityType.PROCESS);
String coordID = bundles.get(0);
List<String> missingDependencies =
OozieUtil.getMissingDependencies(prismHelper, coordID);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
index 41e3002..07292e1 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
@@ -19,7 +19,6 @@
package org.apache.falcon.regression;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.BundleUtil;
@@ -157,7 +156,7 @@ public class ELValidationsTest extends BaseTestClass {
List<String> bundles = null;
for (int i = 0; i < 10; ++i) {
bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS);
+ bundle.getProcessName(), EntityType.PROCESS);
if (bundles.size() > 0) {
break;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
index 6b227d6..8eff8e4 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
@@ -143,30 +143,30 @@ public class ExternalFSTest extends BaseTestClass{
new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}", "${MINUTE}"}, separator);
//configure feed
- String feed = bundles[0].getDataSets().get(0);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
String targetDataLocation = endpoint + testWasbTargetDir + datePattern;
- feed = InstanceUtil.setFeedFilePath(feed, sourcePath + '/' + datePattern);
+ feed.setFilePath(sourcePath + '/' + datePattern);
//erase all clusters from feed definition
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ feed.clearFeedClusters();
//set local cluster as source
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ .build());
//set externalFS cluster as target
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(externalBundle.getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.TARGET)
.withDataLocation(targetDataLocation)
- .build()).toString();
+ .build());
//submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+ LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
datePattern = StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH", "mm"}, separator);
//upload necessary data
DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -181,15 +181,15 @@ public class ExternalFSTest extends BaseTestClass{
Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern);
//check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster, feed, 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, feed.toString(), 0);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed),
+ .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed.toString()),
"REPLICATION"), 1);
TimeUtil.sleepSeconds(10);
//replication should start, wait while it ends
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed), 1,
+ InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
//check if data has been replicated correctly
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
index a6639ed..6dbe60e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
@@ -77,7 +77,7 @@ public class FeedInstanceListingTest extends BaseTestClass{
bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
index 2c8346d..61ef8f3 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
@@ -87,32 +87,32 @@ public class FeedLateRerunTest extends BaseTestClass {
LOGGER.info("Time range between : " + startTime + " and " + endTime);
//configure feed
- String feed = bundles[0].getDataSets().get(0);
- feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setFilePath(feedDataLocation);
//erase all clusters from feed definition
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ feed.clearFeedClusters();
//set cluster1 as source
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ .build());
//set cluster2 as target
- feed = FeedMerlin.fromString(feed).addFeedCluster(
+ feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.TARGET)
.withDataLocation(targetDataLocation)
- .build()).toString();
- String entityName = Util.readEntityName(feed);
+ .build());
+ String entityName = feed.getName();
//submit and schedule feed
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
//check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
Assert.assertEquals(InstanceUtil
.checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, "REPLICATION"), 1);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
index eb8c4fe..de4f692 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -112,29 +112,29 @@ public class FeedReplicationTest extends BaseTestClass {
LOGGER.info("Time range between : " + startTime + " and " + endTime);
//configure feed
- String feed = bundles[0].getDataSets().get(0);
- feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setFilePath(feedDataLocation);
//erase all clusters from feed definition
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ feed.clearFeedClusters();
//set cluster1 as source
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ feed.addFeedCluster(
+ new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+ .withRetention("days(1000000)", ActionType.DELETE)
+ .withValidity(startTime, endTime)
+ .withClusterType(ClusterType.SOURCE)
+ .build());
//set cluster2 as target
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build()).toString();
+ feed.addFeedCluster(
+ new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+ .withRetention("days(1000000)", ActionType.DELETE)
+ .withValidity(startTime, endTime)
+ .withClusterType(ClusterType.TARGET)
+ .withDataLocation(targetDataLocation)
+ .build());
//submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+ LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
//upload necessary data
DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -152,15 +152,15 @@ public class FeedReplicationTest extends BaseTestClass {
}
//check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
- "REPLICATION"), 1);
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed.toString()),
+ "REPLICATION"), 1);
//replication should start, wait while it ends
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
//check if data has been replicated correctly
List<Path> cluster1ReplicatedData = HadoopUtil
@@ -191,37 +191,37 @@ public class FeedReplicationTest extends BaseTestClass {
LOGGER.info("Time range between : " + startTime + " and " + endTime);
//configure feed
- String feed = bundles[0].getDataSets().get(0);
- feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setFilePath(feedDataLocation);
//erase all clusters from feed definition
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ feed.clearFeedClusters();
//set cluster1 as source
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ feed.addFeedCluster(
+ new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+ .withRetention("days(1000000)", ActionType.DELETE)
+ .withValidity(startTime, endTime)
+ .withClusterType(ClusterType.SOURCE)
+ .build());
//set cluster2 as target
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build()).toString();
+ feed.addFeedCluster(
+ new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+ .withRetention("days(1000000)", ActionType.DELETE)
+ .withValidity(startTime, endTime)
+ .withClusterType(ClusterType.TARGET)
+ .withDataLocation(targetDataLocation)
+ .build());
//set cluster3 as target
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build()).toString();
+ feed.addFeedCluster(
+ new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
+ .withRetention("days(1000000)", ActionType.DELETE)
+ .withValidity(startTime, endTime)
+ .withClusterType(ClusterType.TARGET)
+ .withDataLocation(targetDataLocation)
+ .build());
//submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+ LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
//upload necessary data
DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -240,23 +240,23 @@ public class FeedReplicationTest extends BaseTestClass {
}
//check if all coordinators exist
- InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
- InstanceUtil.waitTillInstancesAreCreated(cluster3, feed, 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, feed.toString(), 0);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
- "REPLICATION"), 1);
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
+ "REPLICATION"), 1);
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
- "REPLICATION"), 1);
+ .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
+ "REPLICATION"), 1);
//replication on cluster 2 should start, wait till it ends
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
//replication on cluster 3 should start, wait till it ends
- InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+ InstanceUtil.waitTillInstanceReachState(cluster3OC, feed.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
//check if data has been replicated correctly
List<Path> cluster1ReplicatedData = HadoopUtil
@@ -299,29 +299,29 @@ public class FeedReplicationTest extends BaseTestClass {
FeedMerlin feedElement = bundles[0].getFeedElement(feedName);
feedElement.setAvailabilityFlag(availabilityFlagName);
bundles[0].writeFeedElement(feedElement, feedName);
- String feed = bundles[0].getDataSets().get(0);
- feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setFilePath(feedDataLocation);
//erase all clusters from feed definition
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+ feed.clearFeedClusters();
//set cluster1 as source
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ feed.addFeedCluster(
+ new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+ .withRetention("days(1000000)", ActionType.DELETE)
+ .withValidity(startTime, endTime)
+ .withClusterType(ClusterType.SOURCE)
+ .build());
//set cluster2 as target
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build()).toString();
+ feed.addFeedCluster(
+ new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+ .withRetention("days(1000000)", ActionType.DELETE)
+ .withValidity(startTime, endTime)
+ .withClusterType(ClusterType.TARGET)
+ .withDataLocation(targetDataLocation)
+ .build());
//submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+ LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
//upload necessary data
DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -339,7 +339,7 @@ public class FeedReplicationTest extends BaseTestClass {
}
//check while instance is got created
- InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
//check if coordinator exists
Assert.assertEquals(InstanceUtil
@@ -357,12 +357,12 @@ public class FeedReplicationTest extends BaseTestClass {
OSUtil.OOZIE_EXAMPLE_INPUT_DATA + availabilityFlagName);
//check if instance become running
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
- CoordinatorAction.Status.RUNNING, EntityType.FEED);
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+ CoordinatorAction.Status.RUNNING, EntityType.FEED);
//wait till instance succeed
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
//check if data was replicated correctly
List<Path> cluster1ReplicatedData = HadoopUtil
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
index 4ce6026..47523d8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
@@ -149,17 +149,17 @@ public class LogMoverTest extends BaseTestClass {
private boolean validate(boolean logFlag) throws Exception {
String stagingDir= MerlinConstants.STAGING_LOCATION;
String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs";
- List<Path> logmoverPath = HadoopUtil
+ List<Path> logmoverPaths = HadoopUtil
.getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path)));
if (logFlag) {
- for(int index=0; index < logmoverPath.size(); index++) {
- if (logmoverPath.get(index).toString().contains("SUCCEEDED")) {
+ for (Path logmoverPath : logmoverPaths) {
+ if (logmoverPath.toString().contains("SUCCEEDED")) {
return true;
}
}
} else {
- for(int index=0; index < logmoverPath.size(); index++) {
- if (logmoverPath.get(index).toString().contains("FAILED")) {
+ for (Path logmoverPath : logmoverPaths) {
+ if (logmoverPath.toString().contains("FAILED")) {
return true;
}
}
[2/3] falcon git commit: FALCON-1135 Migrate methods related to
*Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by
Ruslan Ostafiychuk
Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
index 13a9776..76d033c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
@@ -93,7 +93,8 @@ public class NewRetryTest extends BaseTestClass {
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
startDate = new DateTime(DateTimeZone.UTC).plusMinutes(1);
endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2);
- bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessValidity(TimeUtil.dateToOozieDate(startDate.toDate()),
+ TimeUtil.dateToOozieDate(endDate.toDate()));
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
feed.setFeedPathValue(latePath).insertLateFeedValue(new Frequency("minutes(8)"));
@@ -135,7 +136,7 @@ public class NewRetryTest extends BaseTestClass {
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -150,7 +151,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper()
.update((bundles[0].getProcessData()), bundles[0].getProcessData());
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -183,7 +184,7 @@ public class NewRetryTest extends BaseTestClass {
AssertUtil.assertSucceeded(
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int attempt = 0;
attempt < 20 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
@@ -205,7 +206,7 @@ public class NewRetryTest extends BaseTestClass {
.getMessage().contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -242,7 +243,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int i = 0; i < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++i) {
TimeUtil.sleepSeconds(10);
@@ -260,7 +261,7 @@ public class NewRetryTest extends BaseTestClass {
.getMessage().contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -295,7 +296,7 @@ public class NewRetryTest extends BaseTestClass {
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int attempt = 0;
attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 2); ++attempt) {
@@ -316,7 +317,7 @@ public class NewRetryTest extends BaseTestClass {
.getMessage().contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -350,7 +351,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -360,11 +361,11 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertTrue(prism.getProcessHelper()
- .update(Util.readEntityName(bundles[0].getProcessData()),
+ .update(bundles[0].getProcessName(),
null).getMessage()
.contains("updated successfully"), "process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -398,7 +399,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -408,11 +409,11 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertTrue(
- prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData()),
+ prism.getProcessHelper().update(bundles[0].getProcessName(),
bundles[0].getProcessData()).getMessage()
.contains("updated successfully"), "process was not updated successfully");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -449,7 +450,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -460,12 +461,12 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertTrue(prism.getProcessHelper()
- .update(Util.readEntityName(bundles[0].getProcessData()),
+ .update(bundles[0].getProcessName(),
bundles[0].getProcessData()).getMessage()
.contains("updated successfully"),
"process was not updated successfully");
String newBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(bundles[0].getProcessData()),
+ .getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -503,7 +504,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
@@ -513,11 +514,11 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
Assert.assertFalse(
- prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData())
+ prism.getProcessHelper().update(bundles[0].getProcessName()
, bundles[0].getProcessData()).getMessage().contains("updated successfully"),
"process was updated successfully!!!");
String newBundleId = InstanceUtil.getLatestBundleID(cluster,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -555,7 +556,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
//now to validate all failed instances to check if they were retried or not.
validateRetry(clusterOC, bundleId,
@@ -595,7 +596,7 @@ public class NewRetryTest extends BaseTestClass {
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
for (int attempt = 0;
attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
@@ -608,7 +609,7 @@ public class NewRetryTest extends BaseTestClass {
LOGGER.info("now firing user reruns:");
for (int i = 0; i < 1; i++) {
prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ .getProcessInstanceRerun(bundles[0].getProcessName(),
"?start=" + timeFormatter.print(startDate).replace("/", "T") + "Z"
+ "&end=" + timeFormatter.print(endDate).replace("/", "T") + "Z");
}
@@ -648,7 +649,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()),
+ bundles[0].getProcessName(),
EntityType.PROCESS).get(0);
//now to validate all failed instances to check if they were retried or not.
@@ -659,7 +660,7 @@ public class NewRetryTest extends BaseTestClass {
DateTime[] dateBoundaries = getFailureTimeBoundaries(clusterOC, bundleId);
InstancesResult piResult = prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ .getProcessInstanceRerun(bundles[0].getProcessName(),
"?start=" + timeFormatter.print(dateBoundaries[0]).replace("/", "T") + "Z"
+ "&end=" + timeFormatter.print(dateBoundaries[dateBoundaries.length - 1])
.replace("/", "T") + "Z");
@@ -702,7 +703,7 @@ public class NewRetryTest extends BaseTestClass {
AssertUtil.assertSucceeded(
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
List<DateTime> dates = null;
for (int i = 0; i < 10 && dates == null; ++i) {
@@ -802,7 +803,7 @@ public class NewRetryTest extends BaseTestClass {
AssertUtil.assertSucceeded(
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
List<DateTime> dates = null;
for (int i = 0; i < 10 && dates == null; ++i) {
@@ -878,7 +879,7 @@ public class NewRetryTest extends BaseTestClass {
prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+ bundles[0].getProcessName(), EntityType.PROCESS).get(0);
validateRetry(clusterOC, bundleId,
(bundles[0].getProcessObject().getRetry().getAttempts()) / 2);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
index 59a701d..61c076b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
@@ -83,10 +83,9 @@ public class NoOutputProcessTest extends BaseTestClass {
bundles[0].setInputFeedDataPath(inputPath);
bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
process.setOutputs(null);
process.setLateProcess(null);
- bundles[0].setProcessData(process.toString());
bundles[0].submitFeedsScheduleProcess(prism);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
index 8cf2862..d7331cf 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
@@ -30,7 +30,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileSystem;
@@ -103,7 +102,7 @@ public class ProcessFrequencyTest extends BaseTestClass {
TimeUtil.oozieDateToDate(startDate));
HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT);
- final String processName = Util.readEntityName(bundles[0].getProcessData());
+ final String processName = bundles[0].getProcessName();
//InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
index 48cb59b..56e1474 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
@@ -119,11 +119,11 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.getTimeWrtSystemTime(-35), TimeUtil.getTimeWrtSystemTime(25), 1);
- String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
+ String prefix = feed01.getFeedPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.SINGLE_FILE, prefix, dataDates);
- prefix = InstanceUtil.getFeedPrefix(feed02.toString());
+ prefix = feed02.getFeedPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, prefix, dataDates);
@@ -179,7 +179,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
String processStartTime = TimeUtil.getTimeWrtSystemTime(-16);
// String processEndTime = InstanceUtil.getTimeWrtSystemTime(20);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
process.clearProcessCluster();
process.addProcessCluster(
new ProcessMerlin.ProcessClusterBuilder(
@@ -192,8 +192,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
.withValidity(TimeUtil.addMinsToTime(processStartTime, 16),
TimeUtil.addMinsToTime(processStartTime, 45))
.build());
- process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(), feed02.getName(),
- feed02.getName()));
+ process.addInputFeed(feed02.getName(), feed02.getName());
//submit and schedule process
prism.getProcessHelper().submitAndSchedule(process.toString());
@@ -204,42 +203,40 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(serverOC.get(1), process.getName(), 1,
Status.RUNNING, EntityType.PROCESS);
- final String processName = Util.readEntityName(bundles[0].getProcessData());
- InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(
- processName, "?start=" + processStartTime
- + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
+ InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
+ "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(process.getName(),
"?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
+ "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
"?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
+ "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceResume(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceResume(process.getName(),
"?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
"?start=" + TimeUtil.addMinsToTime(processStartTime, 16)
+ "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(processName,
+ responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(process.getName(),
"?start=" + processStartTime + "&end="+ TimeUtil.addMinsToTime(processStartTime, 7));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
- responseInstance = prism.getProcessHelper().getProcessInstanceRerun(processName,
+ responseInstance = prism.getProcessHelper().getProcessInstanceRerun(process.getName(),
"?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
AssertUtil.assertSucceeded(responseInstance);
Assert.assertTrue(responseInstance.getInstances() != null);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index 34dfce3..f299128 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -29,7 +29,6 @@ import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
@@ -83,7 +82,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
index f558cc5..3893ffe 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileSystem;
@@ -79,7 +78,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setProcessConcurrency(6);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
index c9334eb..6003ee0 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -83,7 +82,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index 58936a7..635e238 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -27,7 +27,6 @@ import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.AssertUtil;
@@ -97,7 +96,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index 26348bd..b7fed18 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -18,7 +18,6 @@
package org.apache.falcon.regression;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -29,7 +28,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
@@ -68,7 +66,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
- processName = Util.readEntityName(bundles[0].getProcessData());
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
@@ -115,8 +113,8 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, new ProcessMerlin(bundles[0]
- .getProcessData()).getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z");
AssertUtil.assertSucceeded(r);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index b41cf05..40a4ad2 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -85,8 +85,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
bundles[0].setProcessConcurrency(2);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+ String inputName = bundles[0].getProcessObject().getFirstInputName();
bundles[0].setProcessLatePolicy(getLateData(2, "minutes", "periodic", inputName, aggregateWorkflowDir));
bundles[0].submitAndScheduleProcess();
@@ -126,8 +125,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
bundles[0].setProcessConcurrency(2);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+ String inputName = bundles[0].getProcessObject().getFirstInputName();
bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
bundles[0].submitAndScheduleProcess();
@@ -166,8 +164,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
bundles[0].setProcessValidity(startTime, endTime);
bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+ String inputName = bundles[0].getProcessObject().getFirstInputName();
bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
bundles[0].setProcessConcurrency(2);
@@ -216,17 +213,17 @@ public class ProcessLateRerunTest extends BaseTestClass {
// Increase the window of input for process
bundles[0].setDatasetInstances(startInstance, endInstance);
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
- String inputName = processMerlin.getInputs().getInputs().get(0).getName();
- Input tempFeed = processMerlin.getInputs().getInputs().get(0);
+ ProcessMerlin process = bundles[0].getProcessObject();
+ String inputName = process.getFirstInputName();
+ Input tempFeed = process.getInputs().getInputs().get(0);
Input gateInput = new Input();
gateInput.setName("Gate");
gateInput.setFeed(tempFeed.getFeed());
gateInput.setEnd("now(0,1)");
gateInput.setStart("now(0,1)");
- processMerlin.getInputs().getInputs().add(gateInput);
- bundles[0].setProcessData(processMerlin.toString());
+ process.getInputs().getInputs().add(gateInput);
+ bundles[0].setProcessData(process.toString());
bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
index cd7eba4..c73140d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
@@ -80,7 +80,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleValidProcessSLA() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
new Frequency("6", Frequency.TimeUnit.hours));
bundles[0].setProcessData(processMerlin.toString());
@@ -95,7 +95,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
new Frequency("3", Frequency.TimeUnit.hours));
bundles[0].setProcessData(processMerlin.toString());
@@ -110,7 +110,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours),
new Frequency("2", Frequency.TimeUnit.hours));
bundles[0].setProcessData(processMerlin.toString());
@@ -131,7 +131,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours));
processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
new Frequency("4", Frequency.TimeUnit.hours));
@@ -147,7 +147,7 @@ public class ProcessSLATest extends BaseTestClass {
@Test
public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception {
- ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processMerlin = bundles[0].getProcessObject();
processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours));
processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
new Frequency("4", Frequency.TimeUnit.hours));
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
index 9886d76..ca612b8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
@@ -236,7 +236,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass {
prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
prism.getFeedHelper().submitEntity(feed);
prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle());
- ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processObj = bundles[0].getProcessObject();
processObj.setWorkflow(null);
ServiceResponse response = prism.getProcessHelper().validateEntity(processObj.toString());
AssertUtil.assertFailed(response);
@@ -253,7 +253,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass {
prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
prism.getFeedHelper().submitEntity(feed);
prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle());
- ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin processObj = bundles[0].getProcessObject();
processObj.setWorkflow(null);
ServiceResponse response = cluster.getProcessHelper().validateEntity(processObj.toString());
AssertUtil.assertFailed(response);
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
index f9fcf8d..f58b0f6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
@@ -67,20 +67,20 @@ public class EntitiesPatternSearchTest extends BaseTestClass {
//submit different clusters, feeds and processes
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
ClusterMerlin cluster = bundles[0].getClusterElement();
String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-';
String processNamePrefix = bundles[0].getProcessName() + '-';
String feedNamePrefix = bundles[0].getInputFeedNameFromBundle() + '-';
- List randomeNames = getPatternName();
- for (int i = 0; i < randomeNames.size(); i++) {
- process.setName(processNamePrefix + randomeNames.get(i));
+ List randomNames = getPatternName();
+ for (Object randomName : randomNames) {
+ process.setName(processNamePrefix + randomName);
AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString()));
- feed.setName(feedNamePrefix + randomeNames.get(i));
+ feed.setName(feedNamePrefix + randomName);
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
- cluster.setName(clusterNamePrefix + randomeNames.get(i));
+ cluster.setName(clusterNamePrefix + randomName);
AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster.toString()));
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
index 13b3b88..3ae44e6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
@@ -82,7 +82,7 @@ public class ListEntitiesTest extends BaseTestClass {
//submit 10 different clusters, feeds and processes
FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
ClusterMerlin cluster = bundles[0].getClusterElement();
String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-';
String processNamePrefix = bundles[0].getProcessName() + '-';
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
index 202298e..7d05d6b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
@@ -253,8 +253,7 @@ public class HCatProcessTest extends BaseTestClass {
feedObj.setName(inputFeed2Name);
feedObj.getTable().setUri(inputTableUri2);
- String inputFeed2 = feedObj.toString();
- bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+ bundles[0].addInputFeedToBundle("inputData2", feedObj);
String outputTableUri =
"catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
@@ -347,7 +346,7 @@ public class HCatProcessTest extends BaseTestClass {
FeedMerlin feedObj = new FeedMerlin(outputFeed1);
feedObj.setName(outputFeed2Name);
feedObj.getTable().setUri(outputTableUri2);
- bundles[0].addOutputFeedToBundle("outputData2", feedObj.toString(), 0);
+ bundles[0].addOutputFeedToBundle("outputData2", feedObj);
bundles[0].setProcessValidity(startDate, endDate);
bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
@@ -433,8 +432,7 @@ public class HCatProcessTest extends BaseTestClass {
FeedMerlin feedObj = new FeedMerlin(inputFeed1);
feedObj.setName(inputFeed2Name);
feedObj.getTable().setUri(inputTableUri2);
- String inputFeed2 = feedObj.toString();
- bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+ bundles[0].addInputFeedToBundle("inputData2", feedObj);
String outputTableUri =
"catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
@@ -448,8 +446,7 @@ public class HCatProcessTest extends BaseTestClass {
FeedMerlin feedObj2 = new FeedMerlin(outputFeed1);
feedObj2.setName(outputFeed2Name);
feedObj2.getTable().setUri(outputTableUri2);
- String outputFeed2 = feedObj2.toString();
- bundles[0].addOutputFeedToBundle("outputData2", outputFeed2, 0);
+ bundles[0].addOutputFeedToBundle("outputData2", feedObj2);
bundles[0].setProcessValidity(startDate, endDate);
bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index 4466c13..3f6dc66 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -130,8 +130,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
- .getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ .getLatestBundleID(cluster3, bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -139,16 +138,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 5, TimeUnit.minutes));
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes));
- LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess));
+ LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess.toString()));
//now to update
while (Util
.parseResponse(prism.getProcessHelper()
- .update((bundles[1].getProcessData()), updatedProcess))
+ .update((bundles[1].getProcessData()), updatedProcess.toString()))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("update didn't SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
@@ -186,7 +184,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes =
OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
@@ -256,7 +254,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(25);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -271,14 +269,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//now to update
AssertUtil.assertPartial(prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
- String prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), workflowPath);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder());
+ ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency);
+ Assert.assertEquals(process.getWorkflow().getPath(), workflowPath);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
String coloString = getResponse(cluster2, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(),
- workflowPath2);
+ Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(), workflowPath2);
Util.startService(cluster3.getProcessHelper());
dualComparisonFailure(prism, cluster2, bundles[1].getProcessData());
@@ -296,13 +293,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("WARNING: update did not succeed, retrying ");
TimeUtil.sleepSeconds(20);
}
- prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
- initialConcurrency + 3);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
- workflowPath2);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
- bundles[1].getProcessObject().getOrder());
+ process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+ Assert.assertEquals(process.getWorkflow().getPath(), workflowPath2);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
AssertUtil
.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -336,7 +330,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -344,16 +338,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 7, TimeUnit.minutes));
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("7", TimeUnit.minutes));
LOGGER.info("updated process: " + updatedProcess);
//now to update
-
ServiceResponse response =
- prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
AssertUtil.assertSucceeded(response);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
@@ -361,7 +353,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(),
- new ProcessMerlin(updatedProcess).getFrequency());
+ updatedProcess.getFrequency());
dualComparison(prism, cluster3, bundles[1].getProcessData());
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -376,7 +368,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String originalProcessData = bundles[1].getProcessData();
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -410,7 +402,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -443,7 +435,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
// future : should be verified using cord xml
Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -452,7 +444,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
status != Job.Status.DONEWITHERROR) {
int statusCount = InstanceUtil
.getInstanceCountWithStatus(cluster3,
- Util.readEntityName(bundles[1].getProcessData()),
+ bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS);
if (statusCount == bundles[1].getProcessObject().getParallel() + 3) {
@@ -460,7 +452,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
Assert.assertNotNull(status,
"status must not be null!");
TimeUtil.sleepSeconds(30);
@@ -494,7 +486,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -577,7 +569,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
@@ -610,7 +602,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -619,7 +611,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
.getInstanceCountWithStatus(cluster3,
- Util.readEntityName(bundles[1].getProcessData()),
+ bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
==
@@ -628,7 +620,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
@@ -670,7 +662,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -707,7 +699,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
Job.Status status =
OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -716,7 +708,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
.getInstanceCountWithStatus(cluster3,
- Util.readEntityName(bundles[1].getProcessData()),
+ bundles[1].getProcessName(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
==
@@ -725,13 +717,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
+ bundles[1].getProcessName(), EntityType.PROCESS),
oldNominalTimes, bundles[1].getProcessData(), false,
true
);
@@ -771,7 +763,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -793,13 +785,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) {
TimeUtil.sleepSeconds(10);
}
- String prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
- initialConcurrency + 3);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
- aggregator1Path);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
- bundles[1].getProcessObject().getOrder());
+ ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+ Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
@@ -839,7 +828,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -867,13 +856,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
- String prismString = getResponse(prism, bundles[1].getProcessData(), true);
- Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
- initialConcurrency + 3);
- Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
- aggregator1Path);
- Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
- bundles[1].getProcessObject().getOrder());
+ ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+ Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+ Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path);
+ Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
@@ -913,7 +899,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(20);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -965,7 +951,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
@@ -1090,7 +1076,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -1153,7 +1139,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -1220,7 +1206,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes =
@@ -1228,15 +1214,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 5, TimeUnit.minutes));
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes));
LOGGER.info("updated process: " + updatedProcess);
//now to update
ServiceResponse response =
- prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
AssertUtil.assertSucceeded(response);
InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
@@ -1272,7 +1257,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
@@ -1280,18 +1265,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
- String updatedProcess = InstanceUtil
- .setProcessFrequency(bundles[1].getProcessData(),
- new Frequency("" + 1, TimeUnit.months));
- updatedProcess = InstanceUtil
- .setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10),
- endTime);
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+ updatedProcess.setFrequency(new Frequency("1", TimeUnit.months));
+ updatedProcess.setValidity(TimeUtil.getTimeWrtSystemTime(10), endTime);
LOGGER.info("updated process: " + updatedProcess);
//now to update
ServiceResponse response =
- prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
AssertUtil.assertSucceeded(response);
String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData());
Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(),
@@ -1316,7 +1298,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
@@ -1351,7 +1333,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
@@ -1393,7 +1375,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
.getValidity().getEnd()));
Assert.assertEquals(InstanceUtil
.getProcessInstanceList(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS)
+ bundles[1].getProcessName(), EntityType.PROCESS)
.size(), getExpectedNumberOfWorkflowInstances(newStartTime,
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
@@ -1409,7 +1391,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ bundles[1].getProcessName(), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1476,8 +1458,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
//update workflow.xml
hadoopFileEditor = new HadoopFileEditor(cluster1FS);
- hadoopFileEditor.edit(new ProcessMerlin(b
- .getProcessData()).getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
+ hadoopFileEditor.edit(b.getProcessObject().getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
"<!-- some comment -->");
//update
@@ -1553,7 +1534,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
throws Exception {
while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) {
+ bundle.getProcessName(), EntityType.PROCESS) != state) {
//keep waiting
TimeUtil.sleepSeconds(10);
}
@@ -1566,7 +1547,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
while (coord.getStatus() != state) {
TimeUtil.sleepSeconds(10);
coord = getDefaultOozieCoord(coloHelper, InstanceUtil
- .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
+ .getLatestBundleID(coloHelper, bundle.getProcessName(),
EntityType.PROCESS));
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
index c9e373e..39f0268 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
@@ -93,7 +93,7 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
LOGGER.info(Util.prettyPrintXml(process.toString()));
bundles[0].submitAndScheduleBundle(prism, false);
@@ -125,7 +125,7 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- String processName = Util.readEntityName(bundles[0].getProcessData());
+ String processName = bundles[0].getProcessName();
LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
@@ -163,7 +163,7 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- String processName = Util.readEntityName(bundles[0].getProcessData());
+ String processName = bundles[0].getProcessName();
LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
@@ -232,11 +232,11 @@ public class OptionalInputTest extends BaseTestClass {
LOGGER.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
}
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
- LOGGER.info(Util.prettyPrintXml(process.toString()));
+
+ LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
- InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, bundles[0].getProcessName(),
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
@@ -262,10 +262,8 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
- LOGGER.info(Util.prettyPrintXml(process.toString()));
- String processName = process.getName();
- LOGGER.info(Util.prettyPrintXml(process.toString()));
+ LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ String processName = bundles[0].getProcessName();
bundles[0].submitAndScheduleBundle(prism, true);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
@@ -278,11 +276,9 @@ public class OptionalInputTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- final ProcessMerlin processMerlin = new ProcessMerlin(process);
- processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
- bundles[0].setProcessData(processMerlin.toString());
- bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
- process = new ProcessMerlin(bundles[0].getProcessData());
+ ProcessMerlin process = bundles[0].getProcessObject();
+ process.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
+ process.setProcessInputStartEnd("now(0,-10)", "now(0,0)");
LOGGER.info("modified process:" + Util.prettyPrintXml(process.toString()));
prism.getProcessHelper().update(process.toString(), process.toString());
@@ -319,25 +315,22 @@ public class OptionalInputTest extends BaseTestClass {
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(4);
- ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
- String processName = process.getName();
+ ProcessMerlin process = bundles[0].getProcessObject();
LOGGER.info(Util.prettyPrintXml(process.toString()));
bundles[0].submitAndScheduleBundle(prism, true);
- InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
+ InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), TimeUtil.addMinsToTime(endTime, 10), 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
- inputPath + "/input1/", dataDates);
- InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
- 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ inputPath + "/input1/", dataDates);
+ InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
+ 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- final ProcessMerlin processMerlin = new ProcessMerlin(process);
- processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
- bundles[0].setProcessData(processMerlin.toString());
- process = new ProcessMerlin(bundles[0].getProcessData());
+ process.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
+ bundles[0].setProcessData(process.toString());
//delete all input data
HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS);
@@ -347,7 +340,7 @@ public class OptionalInputTest extends BaseTestClass {
prism.getProcessHelper().update(process.toString(), process.toString());
//from now on ... it should wait of input0 also
- InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
+ InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
}