You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/04/09 15:24:28 UTC

[1/3] falcon git commit: FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by Ruslan Ostafiychuk

Repository: falcon
Updated Branches:
  refs/heads/master 8c7eaa69f -> 395675fb0


http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
index 97d4e67..c6f72cc 100755
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
@@ -201,38 +201,38 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA2 = "2012-10-01T12:10Z";
 
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2012-10-01T12:10Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("")
                 .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2012-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("")
                 .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("")
-                .build()).toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source "
             + "is blank");
@@ -255,42 +255,39 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA2 = "2012-10-01T12:00Z";
 
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(100000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2099-10-01T12:10Z")
-                .build())
-            .toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(100000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2099-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
-                .build())
-            .toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(100000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
-                .build())
-            .toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertSucceeded(r);
 
-        r = prism.getFeedHelper().schedule(feed);
+        r = prism.getFeedHelper().schedule(feed.toString());
         AssertUtil.assertSucceeded(r);
         TimeUtil.sleepSeconds(15);
 
@@ -302,19 +299,19 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/",
             testFile2);
 
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
         Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
                 "REPLICATION"), 1);
         Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+            InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
                 "RETENTION"), 1);
         Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
                 "RETENTION"), 1);
         Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+            InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
                 "RETENTION"), 1);
 
 
@@ -356,53 +353,52 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA2 = "2012-10-01T12:00Z";
 
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2099-10-01T12:10Z")
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2099-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
-                .build())
-            .toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed);
+        ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertSucceeded(r);
 
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
 
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
                 "REPLICATION"), 1);
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
                 "RETENTION"), 1);
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(),
                 "RETENTION"), 1);
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
                 "RETENTION"), 1);
 
 
@@ -449,48 +445,48 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA2 = "2012-10-01T12:10Z";
 
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = InstanceUtil.setFeedFilePath(feed, testBaseDir3 + MINUTE_DATE_PATTERN);
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(testBaseDir3 + MINUTE_DATE_PATTERN);
 
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2012-10-01T12:10Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("${cluster.colo}")
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2012-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("${cluster.colo}")
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
-                .build()).toString();
+                .build());
 
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertSucceeded(r);
 
-        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
         TimeUtil.sleepSeconds(15);
 
-        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
 
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 3,
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 3,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
 
         //check if data has been replicated correctly
@@ -550,38 +546,37 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA2 = "2012-10-01T12:10Z";
 
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2012-10-01T12:10Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2012-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
-                .build())
-            .toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
-                .build()).toString();
+                .build());
 
         //clean target if old data exists
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         AssertUtil.assertFailed(r, "Submission of feed should have failed.");
         Assert.assertTrue(r.getMessage().contains(
                 "Partition expression has to be specified for cluster "
@@ -609,49 +604,46 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA2 = "2012-10-01T12:10Z";
 
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = InstanceUtil.setFeedFilePath(feed,
-            testBaseDir1 + MINUTE_DATE_PATTERN);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(testBaseDir1 + MINUTE_DATE_PATTERN);
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(10000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2012-10-01T12:11Z")
                 .withClusterType(ClusterType.TARGET)
                 .withDataLocation(testBaseDir1 + "/ua1" + MINUTE_DATE_PATTERN)
-                .build())
-            .toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(10000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2012-10-01T12:26Z")
                 .withClusterType(ClusterType.TARGET)
                 .withDataLocation(testBaseDir1 + "/ua2" + MINUTE_DATE_PATTERN)
-                .build())
-            .toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(10000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("${cluster.colo}")
-                .build()).toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertSucceeded(r);
 
-        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
         TimeUtil.sleepSeconds(15);
 
-        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
 
         //check if data has been replicated correctly
@@ -716,47 +708,46 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA1 = "2012-10-01T12:00Z";
         String startTimeUA2 = "2012-10-01T12:00Z";
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2099-10-01T12:10Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDirServer1Source + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2099-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withDataLocation(testBaseDir2 + "/replicated" + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
-                .build())
-            .toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertSucceeded(r);
 
-        r = prism.getFeedHelper().schedule(feed);
+        r = prism.getFeedHelper().schedule(feed.toString());
         AssertUtil.assertSucceeded(r);
         TimeUtil.sleepSeconds(15);
 
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
 
         //check if data has been replicated correctly
@@ -804,49 +795,48 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA1 = "2012-10-01T12:05Z";
         String startTimeUA2 = "2012-10-01T12:10Z";
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = InstanceUtil.setFeedFilePath(feed, testBaseDir1 + MINUTE_DATE_PATTERN);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(testBaseDir1 + MINUTE_DATE_PATTERN);
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2099-10-01T12:10Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDir1 + "/ua1" + MINUTE_DATE_PATTERN + "/")
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2099-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDir1 + "/ua2" + MINUTE_DATE_PATTERN + "/")
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("${cluster.colo}")
                 .withDataLocation(testBaseDir4 + MINUTE_DATE_PATTERN + "/")
-                .build())
-            .toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertSucceeded(r);
 
-        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
         TimeUtil.sleepSeconds(15);
-        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1,
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20);
 
         //check if data has been replicated correctly
@@ -905,38 +895,38 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
         String startTimeUA1 = "2012-10-01T12:05Z";
         String startTimeUA2 = "2012-10-01T12:10Z";
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.clearFeedClusters();
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA1, "2012-10-01T12:10Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("")
                 .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTimeUA2, "2012-10-01T12:25Z")
                 .withClusterType(ClusterType.TARGET)
                 .withPartition("")
                 .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z")
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("")
-                .build()).toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
+        ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString());
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertFailed(r, "is defined more than once for feed");
         Assert.assertTrue(r.getMessage().contains("is defined more than once for feed"));

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
index e1a96f3..6f60bb8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
@@ -101,8 +101,8 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
         bundles[0].setInputFeedDataPath(inputPath);
         Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
 
-        String feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.clearFeedClusters();
 
         // use the colo string here so that the test works in embedded and distributed mode.
         String postFix = "/US/" + cluster2Colo;
@@ -118,62 +118,62 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
 
         String startTime = TimeUtil.getTimeWrtSystemTime(-30);
 
-        feed = FeedMerlin.fromString(feed)
+        feed
             .addFeedCluster(new FeedMerlin.FeedClusterBuilder(
                 Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("hours(10)", ActionType.DELETE)
                 .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 85))
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("US/${cluster.colo}")
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("hours(10)", ActionType.DELETE)
                 .withValidity(TimeUtil.addMinsToTime(startTime, 20),
                     TimeUtil.addMinsToTime(startTime, 105))
                 .withClusterType(ClusterType.TARGET)
-                .build()).toString();
+                .build());
 
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withRetention("hours(10)", ActionType.DELETE)
                 .withValidity(TimeUtil.addMinsToTime(startTime, 40),
                     TimeUtil.addMinsToTime(startTime, 130))
                 .withClusterType(ClusterType.SOURCE)
                 .withPartition("UK/${cluster.colo}")
-                .build()).toString();
+                .build());
 
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString()));
 
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString()));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
 
         //change feed location path
-        feed = InstanceUtil.setFeedFilePath(feed, alternativeInputPath);
+        feed.setFilePath(alternativeInputPath);
 
-        LOGGER.info("updated feed: " + Util.prettyPrintXml(feed));
+        LOGGER.info("updated feed: " + Util.prettyPrintXml(feed.toString()));
 
         //update feed
-        AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed, feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
 
         Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(),
-            Util.readEntityName(feed),
+            Util.readEntityName(feed.toString()),
             "REPLICATION"), 0);
         Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(),
-            Util.readEntityName(feed),
+            Util.readEntityName(feed.toString()),
             "RETENTION"), 2);
         Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(),
-            Util.readEntityName(feed),
+            Util.readEntityName(feed.toString()),
             "REPLICATION"), 0);
         Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(),
-            Util.readEntityName(feed),
+            Util.readEntityName(feed.toString()),
             "RETENTION"), 2);
         Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()),
                 "REPLICATION"), 4);
         Assert.assertEquals(
-            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
+            InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()),
                 "RETENTION"), 2);
     }
 
@@ -209,11 +209,11 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
         feed02.setFeedPathValue(baseTestDir + "/feed02" + MINUTE_DATE_PATTERN);
 
         //generate data in both the colos ua1 and ua3
-        String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
+        String prefix = feed01.getFeedPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
         HadoopUtil.lateDataReplenish(cluster1FS, 25, 1, prefix, null);
 
-        prefix = InstanceUtil.getFeedPrefix(feed02.toString());
+        prefix = feed02.getFeedPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
         HadoopUtil.lateDataReplenish(cluster3FS, 25, 1, prefix, null);
 
@@ -290,9 +290,9 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
             new ProcessMerlin.ProcessClusterBuilder(
                 Util.readEntityName(bundles[2].getClusters().get(0)))
                 .withValidity(processStartTime, processEndTime)
-                .build());
-        process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(),
-            feed02.toString(), feed02.getName()));
+                .build()
+        );
+        process.addInputFeed(feed02.getName(), feed02.getName());
 
         //submit and schedule process
         AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString()));
@@ -305,7 +305,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(serverOC.get(2), process.getName(), 1,
             Status.RUNNING, EntityType.PROCESS, timeout);
 
-        feed01 = new FeedMerlin(InstanceUtil.setFeedFilePath(feed01.toString(), alternativeInputPath));
+        feed01.setFilePath(alternativeInputPath);
         LOGGER.info("updated feed: " + Util.prettyPrintXml(feed01.toString()));
         AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed01.toString(), feed01.toString()));
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
index 483c281..d855e33 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
@@ -125,7 +125,7 @@ public class PrismFeedUpdateTest extends BaseTestClass {
         feed01.setFeedPathValue(baseTestDir + "/feed01" + MINUTE_DATE_PATTERN);
 
         //generate data in both the colos cluster1colo and cluster2colo
-        String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
+        String prefix = feed01.getFeedPrefix();
         String startTime = TimeUtil.getTimeWrtSystemTime(-40);
         System.out.println("Start time = " + startTime);
         HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS);
@@ -183,9 +183,9 @@ public class PrismFeedUpdateTest extends BaseTestClass {
                 .build());
 
         //get 2nd process
-        ProcessMerlin process02 = new ProcessMerlin(InstanceUtil
-            .setProcessName(process01.toString(), this.getClass().getSimpleName()
-                + "-zeroInputProcess" + new Random().nextInt()));
+        ProcessMerlin process02 = new ProcessMerlin(process01);
+        process02.setName(this.getClass().getSimpleName() + "-zeroInputProcess"
+            + new Random().nextInt());
         List<String> feed = new ArrayList<String>();
         feed.add(outputFeed.toString());
         process02.setProcessFeeds(feed, 0, 0, 1);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
index f1ff8fe..e2f01c5 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
@@ -106,14 +106,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
         //now ensure that data has been deleted from all cluster store and is present in the
         // cluster archives
 
-        String clusterName = Util.readEntityName(bundle.getProcessData());
+        String processName = bundle.getProcessName();
         //prism:
-        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+        compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
         //UA1:
-        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
-        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+        compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
 
         //UA2:
         compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -157,7 +157,7 @@ public class PrismProcessDeleteTest extends BaseTestClass {
             //now ensure that data has been deleted from all cluster store and is present in the
             // cluster archives
 
-            String clusterName = Util.readEntityName(bundle.getProcessData());
+            String processName = bundle.getProcessName();
             //prism:
             compareDataStoresForEquality(initialPrismStore, finalPrismStore);
             compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
@@ -177,16 +177,16 @@ public class PrismProcessDeleteTest extends BaseTestClass {
 
             HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
 
-            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName);
             compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
-                clusterName);
+                processName);
 
             compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
             compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
 
-            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName);
             compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
-                clusterName);
+                processName);
         } catch (Exception e) {
             LOGGER.info(e.getMessage());
             throw new TestNGException(e.getMessage());
@@ -290,18 +290,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
             //now ensure that data has been deleted from all cluster store and is present in the
             // cluster archives
 
-            String clusterName = Util.readEntityName(bundle.getProcessData());
+            String processName = bundle.getProcessName();
             //prism:
-            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+            compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
             //UA2:
             compareDataStoresForEquality(initialUA2Store, finalUA2Store);
             compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
 
             //UA1:
-            compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
-            compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+            compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+            compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
 
         } catch (Exception e) {
             LOGGER.info(e.getMessage());
@@ -455,14 +455,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
         //now ensure that data has been deleted from all cluster store and is present in the
         // cluster archives
 
-        String clusterName = Util.readEntityName(bundle.getProcessData());
+        String processName = bundle.getProcessName();
         //prism:
-        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+        compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
         //UA1:
-        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
-        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+        compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
 
         //UA2:
         compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -506,14 +506,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
         //now ensure that data has been deleted from all cluster store and is present in the
         // cluster archives
 
-        String clusterName = Util.readEntityName(bundles[0].getProcessData());
+        String processName = bundles[0].getProcessName();
         //prism:
-        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+        compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
         //UA1:
-        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
-        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+        compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
 
         //UA2:
         compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -559,14 +559,14 @@ public class PrismProcessDeleteTest extends BaseTestClass {
         //now ensure that data has been deleted from all cluster store and is present in the
         // cluster archives
 
-        String clusterName = Util.readEntityName(bundle.getProcessData());
+        String processName = bundle.getProcessName();
         //prism:
-        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+        compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
         //UA1:
-        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
-        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+        compareDataStoreStates(initialUA1Store, finalUA1Store, processName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName);
 
         //UA2:
         compareDataStoresForEquality(initialUA2Store, finalUA2Store);
@@ -672,7 +672,7 @@ public class PrismProcessDeleteTest extends BaseTestClass {
             //now ensure that data has been deleted from all cluster store and is present in the
             // cluster archives
 
-            String clusterName = Util.readEntityName(bundles[0].getProcessData());
+            String processName = bundles[0].getProcessName();
             //prism:
             compareDataStoresForEquality(initialPrismStore, finalPrismStore);
             compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
@@ -694,13 +694,13 @@ public class PrismProcessDeleteTest extends BaseTestClass {
             compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
             compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
 
-            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName);
             compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
-                clusterName);
+                processName);
 
-            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName);
             compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
-                clusterName);
+                processName);
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -752,18 +752,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
             //now ensure that data has been deleted from all cluster store and is present in the
             // cluster archives
 
-            String clusterName = Util.readEntityName(bundle.getProcessData());
+            String processName = bundle.getProcessName();
             //prism:
-            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+            compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
             //UA1:
             compareDataStoresForEquality(initialUA1Store, finalUA1Store);
             compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
 
             //UA2:
-            compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
-            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+            compareDataStoreStates(initialUA2Store, finalUA2Store, processName);
+            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName);
         } catch (Exception e) {
             e.printStackTrace();
             throw new TestNGException(e.getMessage());
@@ -817,18 +817,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
             //now ensure that data has been deleted from all cluster store and is present in the
             // cluster archives
 
-            String clusterName = Util.readEntityName(bundle.getProcessData());
+            String processName = bundle.getProcessName();
             //prism:
-            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+            compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
             //UA1:
             compareDataStoresForEquality(initialUA1Store, finalUA1Store);
             compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
 
             //UA2:
-            compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
-            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+            compareDataStoreStates(initialUA2Store, finalUA2Store, processName);
+            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName);
         } catch (Exception e) {
             e.printStackTrace();
             throw new TestNGException(e.getMessage());
@@ -876,18 +876,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
             //now ensure that data has been deleted from all cluster store and is present in the
             // cluster archives
 
-            String clusterName = Util.readEntityName(bundles[1].getProcessData());
+            String processName = bundles[1].getProcessName();
             //prism:
-            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
-            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+            compareDataStoreStates(initialPrismStore, finalPrismStore, processName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName);
 
             //UA1:
             compareDataStoresForEquality(initialUA1Store, finalUA1Store);
             compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
 
             //UA2:
-            compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
-            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+            compareDataStoreStates(initialUA2Store, finalUA2Store, processName);
+            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName);
 
 
             Util.startService(cluster2.getClusterHelper());
@@ -897,18 +897,18 @@ public class PrismProcessDeleteTest extends BaseTestClass {
 
             HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
 
-            clusterName = Util.readEntityName(bundles[0].getProcessData());
+            processName = bundles[0].getProcessName();
 
             compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
             compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
 
-            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName);
             compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
-                clusterName);
+                processName);
 
-            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName);
             compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
-                clusterName);
+                processName);
 
         } catch (Exception e) {
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
index 03f380d..4555221 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
@@ -335,9 +335,9 @@ public class PrismProcessScheduleTest extends BaseTestClass {
 
             InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
             OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
-                    Util.readEntityName(bundles[0].getProcessData()), 0);
+                    bundles[0].getProcessName(), 0);
             InstanceUtil.waitTillInstanceReachState(cluster1OC,
-                    Util.readEntityName(bundles[0].getProcessData()), 2,
+                    bundles[0].getProcessName(), 2,
                     CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
 
             InstanceUtil.waitForBundleToReachState(cluster1,

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
index dfb405f..4aa7189 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
@@ -179,7 +179,7 @@ public class PrismProcessSnSTest extends BaseTestClass {
         //reschedule trial
         AssertUtil.assertSucceeded(cluster2.getProcessHelper().schedule(bundles[0].getProcessData()));
         Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+                bundles[0].getProcessName(), EntityType.PROCESS).size(), 1);
         AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
         AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
index 7bc4b5b..f90a76b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.regression.prism;
 
-import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.response.ServiceResponse;
@@ -160,8 +159,7 @@ public class PrismSubmitTest extends BaseTestClass {
         List<String> afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
 
         AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
-        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
-            new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1);
         AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
 
         Util.startService(cluster1.getClusterHelper());
@@ -179,8 +177,7 @@ public class PrismSubmitTest extends BaseTestClass {
         afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
 
         AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
-        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
-            new ProcessMerlin(bundles[0].getProcessData()).getName(), -1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), -1);
         AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
     }
 
@@ -219,10 +216,8 @@ public class PrismSubmitTest extends BaseTestClass {
         afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
         afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
 
-        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
-            new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
-        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
-            new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, bundles[0].getProcessName(), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1);
         AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
 
     }
@@ -566,8 +561,7 @@ public class PrismSubmitTest extends BaseTestClass {
         afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
 
         AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
-        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
-            new ProcessMerlin(bundles[0].getProcessData()).getName(), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1);
         AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
index 272ac3b..f407601 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.regression.prism;
 
-import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.Property;
@@ -112,8 +111,7 @@ public class ProcessPartitionExpVariableTest extends BaseTestClass {
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, baseTestDir
             + "/input1/", dataDates);
 
-        InstanceUtil.waitTillInstanceReachState(clusterOC,
-            new ProcessMerlin(bundles[0].getProcessData()).getName(), 2,
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
index 1d65d12..2a73538 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
@@ -23,7 +23,6 @@ import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.AssertUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
@@ -76,8 +75,9 @@ public class RescheduleKilledProcessTest extends BaseTestClass {
     public void rescheduleKilledProcess() throws Exception {
         String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
         String processEndTime = TimeUtil.getTimeWrtSystemTime(6);
-        ProcessMerlin process = new ProcessMerlin(InstanceUtil.setProcessName(bundles[0].getProcessData(),
-            this.getClass().getSimpleName() + "zeroInputProcess" + new Random().nextInt()));
+        ProcessMerlin process = bundles[0].getProcessObject();
+        process.setName(this.getClass().getSimpleName() + "-zeroInputProcess"
+            + new Random().nextInt());
         List<String> feed = new ArrayList<String>();
         feed.add(bundles[0].getOutputFeedFromBundle());
         process.setProcessFeeds(feed, 0, 0, 1);
@@ -87,7 +87,8 @@ public class RescheduleKilledProcessTest extends BaseTestClass {
             new ProcessMerlin.ProcessClusterBuilder(
                 Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withValidity(processStartTime, processEndTime)
-                .build());
+                .build()
+        );
         bundles[0].setProcessData(process.toString());
 
         bundles[0].submitFeedsScheduleProcess(prism);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
index 7e4422b..8d7ac7f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
@@ -29,7 +29,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -159,7 +158,7 @@ public class RescheduleProcessInFinalStatesTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
         prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+            .getProcessInstanceKill(bundles[0].getProcessName(),
                 "?start=2010-01-02T01:05Z&end=2010-01-02T01:11Z");
         InstanceUtil
             .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.DONEWITHERROR);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index 0cc0d6e..14d76af 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -126,9 +126,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
 
         //update frequency
-        Frequency f = new Frequency("" + 21, Frequency.TimeUnit.minutes);
-        String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
-        ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed, "abc", null);
+        FeedMerlin updatedFeed = new FeedMerlin(feed);
+        updatedFeed.setFrequency(new Frequency("21", Frequency.TimeUnit.minutes));
+        ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed.toString(), "abc", null);
         Assert.assertTrue(r.getMessage()
             .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
     }
@@ -182,9 +182,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
 
         //update frequency
-        Frequency f = new Frequency("" + 7, Frequency.TimeUnit.minutes);
-        String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
-        r = prism.getFeedHelper().update(feed, updatedFeed,
+        FeedMerlin updatedFeed = new FeedMerlin(feed);
+        updatedFeed.setFrequency(new Frequency("7", Frequency.TimeUnit.minutes));
+        r = prism.getFeedHelper().update(feed, updatedFeed.toString(),
             TimeUtil.getTimeWrtSystemTime(-10000), null);
         AssertUtil.assertSucceeded(r);
         InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
index 4c54409..040057e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
@@ -141,7 +141,7 @@ public class ProcessUITest extends BaseUITestClass {
                 Generator.getHadoopPathGenerator(feedInputPath, MINUTE_DATE_PATTERN));
         int j = 0;
         for (FeedMerlin feed : inputFeeds) {
-            bundles[0].addInputFeedToBundle("inputFeed" + j, feed.toString(), j++);
+            bundles[0].addInputFeedToBundle("inputFeed" + j++, feed);
         }
 
         outputFeeds = LineageApiTest.generateFeeds(numOutputFeeds, outputMerlin,
@@ -150,7 +150,7 @@ public class ProcessUITest extends BaseUITestClass {
                 Generator.getHadoopPathGenerator(feedOutputPath, MINUTE_DATE_PATTERN));
         j = 0;
         for (FeedMerlin feed : outputFeeds) {
-            bundles[0].addOutputFeedToBundle("outputFeed" + j, feed.toString(), j++);
+            bundles[0].addOutputFeedToBundle("outputFeed" + j++, feed);
         }
 
         AssertUtil.assertSucceeded(bundles[0].submitBundle(prism));


[3/3] falcon git commit: FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by Ruslan Ostafiychuk

Posted by ro...@apache.org.
FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by Ruslan Ostafiychuk


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/395675fb
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/395675fb
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/395675fb

Branch: refs/heads/master
Commit: 395675fb0f4096dc44f1e325681536328956440e
Parents: 8c7eaa6
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Tue Feb 3 15:18:27 2015 +0200
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Thu Apr 9 16:22:05 2015 +0300

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../regression/Entities/ClusterMerlin.java      |  26 ++
 .../falcon/regression/Entities/FeedMerlin.java  |  46 ++++
 .../regression/Entities/ProcessMerlin.java      | 145 +++++++++++
 .../falcon/regression/core/bundle/Bundle.java   | 232 +++---------------
 .../falcon/regression/core/util/BundleUtil.java |   2 +-
 .../regression/core/util/InstanceUtil.java      | 102 --------
 .../falcon/regression/AuthorizationTest.java    | 126 +++++-----
 .../regression/ELExpCurrentAndLastWeekTest.java |   3 +-
 .../falcon/regression/ELValidationsTest.java    |   3 +-
 .../falcon/regression/ExternalFSTest.java       |  24 +-
 .../regression/FeedInstanceListingTest.java     |   2 +-
 .../falcon/regression/FeedLateRerunTest.java    |  20 +-
 .../falcon/regression/FeedReplicationTest.java  | 162 ++++++-------
 .../apache/falcon/regression/LogMoverTest.java  |  10 +-
 .../apache/falcon/regression/NewRetryTest.java  |  59 ++---
 .../falcon/regression/NoOutputProcessTest.java  |   3 +-
 .../falcon/regression/ProcessFrequencyTest.java |   3 +-
 .../ProcessInstanceColoMixedTest.java           |  27 +--
 .../regression/ProcessInstanceKillsTest.java    |   3 +-
 .../regression/ProcessInstanceResumeTest.java   |   3 +-
 .../regression/ProcessInstanceRunningTest.java  |   3 +-
 .../regression/ProcessInstanceStatusTest.java   |   3 +-
 .../regression/ProcessInstanceSuspendTest.java  |   8 +-
 .../falcon/regression/ProcessLateRerunTest.java |  19 +-
 .../falcon/regression/ProcessSLATest.java       |  10 +-
 .../ValidateAPIPrismAndServerTest.java          |   4 +-
 .../entity/EntitiesPatternSearchTest.java       |  12 +-
 .../regression/entity/ListEntitiesTest.java     |   2 +-
 .../falcon/regression/hcat/HCatProcessTest.java |  11 +-
 .../prism/NewPrismProcessUpdateTest.java        | 151 +++++-------
 .../regression/prism/OptionalInputTest.java     |  45 ++--
 .../PrismFeedReplicationPartitionExpTest.java   | 240 +++++++++----------
 .../prism/PrismFeedReplicationUpdateTest.java   |  52 ++--
 .../regression/prism/PrismFeedUpdateTest.java   |   8 +-
 .../prism/PrismProcessDeleteTest.java           | 110 ++++-----
 .../prism/PrismProcessScheduleTest.java         |   4 +-
 .../regression/prism/PrismProcessSnSTest.java   |   2 +-
 .../regression/prism/PrismSubmitTest.java       |  16 +-
 .../prism/ProcessPartitionExpVariableTest.java  |   4 +-
 .../prism/RescheduleKilledProcessTest.java      |   9 +-
 .../RescheduleProcessInFinalStatesTest.java     |   3 +-
 .../prism/UpdateAtSpecificTimeTest.java         |  12 +-
 .../falcon/regression/ui/ProcessUITest.java     |   4 +-
 44 files changed, 808 insertions(+), 927 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 2d0e4c3..76c1090 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -63,6 +63,8 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+   FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and
+   Bundle.java (Ruslan Ostafiychuk)
 
    FALCON-1088 Fixing FeedDelayParallelTimeoutTest and renaming it to FeedDelayTest(Pragya M via 
    Samarth G)

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
index 22ec5da..a99b307 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
@@ -23,11 +23,17 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.ACL;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfaces;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Location;
 import org.testng.Assert;
 
 import javax.xml.bind.JAXBException;
 import java.io.StringWriter;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Class for representing a cluster xml. */
@@ -78,4 +84,24 @@ public class ClusterMerlin extends Cluster {
         acl.setPermission(permission);
         this.setACL(acl);
     }
+
+    public void setInterface(Interfacetype interfacetype, String value) {
+        final Interfaces interfaces = this.getInterfaces();
+        final List<Interface> interfaceList = interfaces.getInterfaces();
+        for (final Interface anInterface : interfaceList) {
+            if (anInterface.getType() == interfacetype) {
+                anInterface.setEndpoint(value);
+            }
+        }
+    }
+
+    public void setWorkingLocationPath(String path) {
+        for (Location location : getLocations().getLocations()) {
+            if (location.getName() == ClusterLocationType.WORKING) {
+                location.setPath(path);
+                break;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
index 70e2e73..1b59227 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
@@ -328,4 +328,50 @@ public class FeedMerlin extends Feed {
         this.setSla(sla);
     }
 
+    /**
+     * Sets new feed data path (for first location).
+     *
+     * @param path new feed data path
+     */
+    public void setFilePath(String path) {
+        getLocations().getLocations().get(0).setPath(path);
+    }
+
+
+    /**
+     * Retrieves prefix (main sub-folders) of first feed data path.
+     */
+    public String getFeedPrefix() {
+        String path = getLocations().getLocations().get(0).getPath();
+        return path.substring(0, path.indexOf('$'));
+    }
+
+    public void setValidity(String feedStart, String feedEnd) {
+        this.getClusters().getClusters().get(0).getValidity()
+            .setStart(TimeUtil.oozieDateToDate(feedStart).toDate());
+        this.getClusters().getClusters().get(0).getValidity()
+            .setEnd(TimeUtil.oozieDateToDate(feedEnd).toDate());
+
+    }
+
+    public void setDataLocationPath(String path) {
+        final List<Location> locations = this.getLocations().getLocations();
+        for (Location location : locations) {
+            if (location.getType() == LocationType.DATA) {
+                location.setPath(path);
+            }
+        }
+    }
+
+    public void setPeriodicity(int frequency, Frequency.TimeUnit periodicity) {
+        Frequency frq = new Frequency(String.valueOf(frequency), periodicity);
+        this.setFrequency(frq);
+    }
+
+    public void setTableUri(String tableUri) {
+        final CatalogTable catalogTable = new CatalogTable();
+        catalogTable.setUri(tableUri);
+        this.setTable(catalogTable);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index 01fdd04..8732a44 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Sla;
 import org.apache.falcon.entity.v0.process.ACL;
 import org.apache.falcon.entity.v0.process.Cluster;
@@ -35,6 +36,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Properties;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.testng.Assert;
 
@@ -268,6 +270,149 @@ public class ProcessMerlin extends Process {
         this.setSla(sla);
     }
 
+    /**
+     * Sets new process validity on all the process clusters.
+     *
+     * @param startTime start of process validity
+     * @param endTime   end of process validity
+     */
+    public void setValidity(String startTime, String endTime) {
+
+        for (Cluster cluster : this.getClusters().getClusters()) {
+            cluster.getValidity().setStart(TimeUtil.oozieDateToDate(startTime).toDate());
+            cluster.getValidity().setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
+        }
+    }
+
+    /**
+     * Adds one output into process.
+     */
+    public void addOutputFeed(String outputName, String feedName) {
+        Output out1 = getOutputs().getOutputs().get(0);
+        Output out2 = new Output();
+        out2.setFeed(feedName);
+        out2.setName(outputName);
+        out2.setInstance(out1.getInstance());
+        getOutputs().getOutputs().add(out2);
+    }
+
+
+
+    /**
+     * Adds one input into process.
+     */
+    public void addInputFeed(String inputName, String feedName) {
+        Input in1 = getInputs().getInputs().get(0);
+        Input in2 = new Input();
+        in2.setEnd(in1.getEnd());
+        in2.setFeed(feedName);
+        in2.setName(inputName);
+        in2.setPartition(in1.getPartition());
+        in2.setStart(in1.getStart());
+        in2.setOptional(in1.isOptional());
+        getInputs().getInputs().add(in2);
+    }
+
+
+    public void setInputFeedWithEl(String inputFeedName, String startEl, String endEl) {
+        Inputs inputs = new Inputs();
+        Input input = new Input();
+        input.setFeed(inputFeedName);
+        input.setStart(startEl);
+        input.setEnd(endEl);
+        input.setName("inputData");
+        inputs.getInputs().add(input);
+        this.setInputs(inputs);
+    }
+
+    public void setDatasetInstances(String startInstance, String endInstance) {
+        this.getInputs().getInputs().get(0).setStart(startInstance);
+        this.getInputs().getInputs().get(0).setEnd(endInstance);
+    }
+
+    public void setProcessInputStartEnd(String start, String end) {
+        for (Input input : this.getInputs().getInputs()) {
+            input.setStart(start);
+            input.setEnd(end);
+        }
+    }
+
+    /**
+     * Sets name(s) of the process output(s).
+     *
+     * @param names new names of the outputs
+     */
+    public void setOutputNames(String... names) {
+        Outputs outputs = this.getOutputs();
+        Assert.assertEquals(outputs.getOutputs().size(), names.length,
+            "Number of output names is not equal to number of outputs in process");
+        for (int i = 0; i < names.length; i++) {
+            outputs.getOutputs().get(i).setName(names[i]);
+        }
+        this.setOutputs(outputs);
+    }
+
+
+    /**
+     * Sets partition for each input, according to number of supplied partitions.
+     *
+     * @param partition partitions to be set
+     */
+    public void setInputPartition(String... partition) {
+        for (int i = 0; i < partition.length; i++) {
+            this.getInputs().getInputs().get(i).setPartition(partition[i]);
+        }
+    }
+
+    /**
+     * Adds optional property to process definition.
+     *
+     * @param properties desired properties to be added
+     */
+    public void addProperties(Property... properties) {
+        for (Property property : properties) {
+            this.getProperties().getProperties().add(property);
+        }
+    }
+
+    /**
+     * Changes names of process inputs.
+     *
+     * @param names desired names of inputs
+     */
+    public void setInputNames(String... names) {
+        for (int i = 0; i < names.length; i++) {
+            this.getInputs().getInputs().get(i).setName(names[i]);
+        }
+    }
+
+    public void setPeriodicity(int frequency, Frequency.TimeUnit periodicity) {
+        Frequency frq = new Frequency(String.valueOf(frequency), periodicity);
+        this.setFrequency(frq);
+    }
+
+    public void setTimeOut(int magnitude, Frequency.TimeUnit unit) {
+        Frequency frq = new Frequency(String.valueOf(magnitude), unit);
+        this.setTimeout(frq);
+    }
+
+
+
+    public void setWorkflow(String wfPath, String libPath, EngineType engineType) {
+        Workflow w = this.getWorkflow();
+        if (engineType != null) {
+            w.setEngine(engineType);
+        }
+        if (libPath != null) {
+            w.setLib(libPath);
+        }
+        w.setPath(wfPath);
+        this.setWorkflow(w);
+    }
+
+    public String getFirstInputName() {
+        return getInputs().getInputs().get(0).getName();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index b0fa0a5..2f43972 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -21,24 +21,14 @@ package org.apache.falcon.regression.core.bundle;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfaces;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.entity.v0.process.Retry;
-import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.regression.Entities.ClusterMerlin;
 import org.apache.falcon.regression.Entities.FeedMerlin;
 import org.apache.falcon.regression.Entities.ProcessMerlin;
@@ -50,9 +40,6 @@ import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 import org.testng.Assert;
 
 import javax.xml.bind.JAXBException;
@@ -228,7 +215,7 @@ public class Bundle {
      */
     public void generateUniqueBundle(String prefix) {
         /* creating new names */
-        List<ClusterMerlin> clusterMerlinList = BundleUtil.fromString(clusters);
+        List<ClusterMerlin> clusterMerlinList = BundleUtil.getClustersFromStrings(clusters);
         Map<String, String> clusterNameMap = new HashMap<String, String>();
         for (ClusterMerlin clusterMerlin : clusterMerlinList) {
             clusterNameMap.putAll(clusterMerlin.setUniqueName(prefix));
@@ -309,14 +296,7 @@ public class Bundle {
      */
     public void setProcessInput(String startEl, String endEl) {
         ProcessMerlin process = getProcessObject();
-        Inputs inputs = new Inputs();
-        Input input = new Input();
-        input.setFeed(Util.readEntityName(getInputFeedFromBundle()));
-        input.setStart(startEl);
-        input.setEnd(endEl);
-        input.setName("inputData");
-        inputs.getInputs().add(input);
-        process.setInputs(inputs);
+        process.setInputFeedWithEl(Util.readEntityName(getInputFeedFromBundle()), startEl, endEl);
         this.setProcessData(process.toString());
     }
 
@@ -342,10 +322,7 @@ public class Bundle {
 
     public void setFeedValidity(String feedStart, String feedEnd, String feedName) {
         FeedMerlin feedElement = getFeedElement(feedName);
-        feedElement.getClusters().getClusters().get(0).getValidity()
-            .setStart(TimeUtil.oozieDateToDate(feedStart).toDate());
-        feedElement.getClusters().getClusters().get(0).getValidity()
-            .setEnd(TimeUtil.oozieDateToDate(feedEnd).toDate());
+        feedElement.setValidity(feedStart, feedEnd);
         writeFeedElement(feedElement, feedName);
     }
 
@@ -377,24 +354,19 @@ public class Bundle {
 
     public void setDatasetInstances(String startInstance, String endInstance) {
         ProcessMerlin processElement = getProcessObject();
-        processElement.getInputs().getInputs().get(0).setStart(startInstance);
-        processElement.getInputs().getInputs().get(0).setEnd(endInstance);
+        processElement.setDatasetInstances(startInstance, endInstance);
         setProcessData(processElement.toString());
     }
 
     public void setProcessPeriodicity(int frequency, TimeUnit periodicity) {
         ProcessMerlin processElement = getProcessObject();
-        Frequency frq = new Frequency("" + frequency, periodicity);
-        processElement.setFrequency(frq);
+        processElement.setPeriodicity(frequency, periodicity);
         setProcessData(processElement.toString());
     }
 
     public void setProcessInputStartEnd(String start, String end) {
         ProcessMerlin processElement = getProcessObject();
-        for (Input input : processElement.getInputs().getInputs()) {
-            input.setStart(start);
-            input.setEnd(end);
-        }
+        processElement.setProcessInputStartEnd(start, end);
         setProcessData(processElement.toString());
     }
 
@@ -421,23 +393,10 @@ public class Bundle {
     }
 
     public void setOutputFeedLocationData(String path) {
-        ProcessMerlin processElement = new ProcessMerlin(processData);
-        String outputDataset = null;
-        int datasetIndex;
-        for (datasetIndex = 0; datasetIndex < dataSets.size(); datasetIndex++) {
-            outputDataset = dataSets.get(datasetIndex);
-            if (outputDataset.contains(processElement.getOutputs().getOutputs().get(0).getFeed())) {
-                break;
-            }
-        }
-
-        FeedMerlin feedElement = new FeedMerlin(outputDataset);
-        Location l = new Location();
-        l.setPath(path);
-        l.setType(LocationType.DATA);
-        feedElement.getLocations().getLocations().set(0, l);
-        dataSets.set(datasetIndex, feedElement.toString());
-        LOGGER.info("modified location path dataSet is: " + dataSets.get(datasetIndex));
+        FeedMerlin feedElement = getFeedElement(getOutputFeedNameFromBundle());
+        feedElement.setDataLocationPath(path);
+        writeFeedElement(feedElement, feedElement.getName());
+        LOGGER.info("modified location path dataSet is: " + feedElement);
     }
 
     public void setProcessConcurrency(int concurrency) {
@@ -456,15 +415,7 @@ public class Bundle {
 
     public void setProcessWorkflow(String wfPath, String libPath, EngineType engineType) {
         ProcessMerlin processElement = getProcessObject();
-        Workflow w = processElement.getWorkflow();
-        if (engineType != null) {
-            w.setEngine(engineType);
-        }
-        if (libPath != null) {
-            w.setLib(libPath);
-        }
-        w.setPath(wfPath);
-        processElement.setWorkflow(w);
+        processElement.setWorkflow(wfPath, libPath, engineType);
         setProcessData(processElement.toString());
     }
 
@@ -501,8 +452,7 @@ public class Bundle {
     public void setInputFeedPeriodicity(int frequency, TimeUnit periodicity) {
         String feedName = getInputFeedNameFromBundle();
         FeedMerlin feedElement = getFeedElement(feedName);
-        Frequency frq = new Frequency("" + frequency, periodicity);
-        feedElement.setFrequency(frq);
+        feedElement.setPeriodicity(frequency, periodicity);
         writeFeedElement(feedElement, feedName);
 
     }
@@ -520,12 +470,7 @@ public class Bundle {
     public void setInputFeedDataPath(String path) {
         String feedName = getInputFeedNameFromBundle();
         FeedMerlin feedElement = getFeedElement(feedName);
-        final List<Location> locations = feedElement.getLocations().getLocations();
-        for (Location location : locations) {
-            if (location.getType() == LocationType.DATA) {
-                locations.get(0).setPath(path);
-            }
-        }
+        feedElement.setDataLocationPath(path);
         writeFeedElement(feedElement, feedName);
     }
 
@@ -536,40 +481,9 @@ public class Bundle {
             .getPath());
     }
 
-    public void setProcessValidity(DateTime startDate, DateTime endDate) {
-
-        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd/HH:mm");
-
-        String start = formatter.print(startDate).replace("/", "T") + "Z";
-        String end = formatter.print(endDate).replace("/", "T") + "Z";
-
-        ProcessMerlin processElement = new ProcessMerlin(processData);
-
-        for (Cluster cluster : processElement.getClusters().getClusters()) {
-
-            org.apache.falcon.entity.v0.process.Validity validity =
-                new org.apache.falcon.entity.v0.process.Validity();
-            validity.setStart(TimeUtil.oozieDateToDate(start).toDate());
-            validity.setEnd(TimeUtil.oozieDateToDate(end).toDate());
-            cluster.setValidity(validity);
-
-        }
-
-        processData = processElement.toString();
-    }
-
     public void setProcessValidity(String startDate, String endDate) {
         ProcessMerlin processElement = new ProcessMerlin(processData);
-
-        for (Cluster cluster : processElement.getClusters().getClusters()) {
-            org.apache.falcon.entity.v0.process.Validity validity =
-                new org.apache.falcon.entity.v0.process.Validity();
-            validity.setStart(TimeUtil.oozieDateToDate(startDate).toDate());
-            validity.setEnd(TimeUtil.oozieDateToDate(endDate).toDate());
-            cluster.setValidity(validity);
-
-        }
-
+        processElement.setValidity(startDate, endDate);
         processData = processElement.toString();
     }
 
@@ -603,16 +517,9 @@ public class Bundle {
         }
     }
 
-    public void addProcessInput(String feed, String feedName) {
+    public void addProcessInput(String inputName, String feedName) {
         ProcessMerlin processElement = getProcessObject();
-        Input in1 = processElement.getInputs().getInputs().get(0);
-        Input in2 = new Input();
-        in2.setEnd(in1.getEnd());
-        in2.setFeed(feed);
-        in2.setName(feedName);
-        in2.setPartition(in1.getPartition());
-        in2.setStart(in1.getStart());
-        processElement.getInputs().getInputs().add(in2);
+        processElement.addInputFeed(inputName, feedName);
         setProcessData(processElement.toString());
     }
 
@@ -654,43 +561,27 @@ public class Bundle {
 
     public void setClusterInterface(Interfacetype interfacetype, String value) {
         ClusterMerlin c = getClusterElement();
-        final Interfaces interfaces = c.getInterfaces();
-        final List<Interface> interfaceList = interfaces.getInterfaces();
-        for (final Interface anInterface : interfaceList) {
-            if (anInterface.getType() == interfacetype) {
-                anInterface.setEndpoint(value);
-            }
-        }
+        c.setInterface(interfacetype, value);
         writeClusterElement(c);
     }
 
     public void setInputFeedTableUri(String tableUri) {
         final String feedStr = getInputFeedFromBundle();
         FeedMerlin feed = new FeedMerlin(feedStr);
-        final CatalogTable catalogTable = new CatalogTable();
-        catalogTable.setUri(tableUri);
-        feed.setTable(catalogTable);
+        feed.setTableUri(tableUri);
         writeFeedElement(feed, feed.getName());
     }
 
     public void setOutputFeedTableUri(String tableUri) {
         final String feedStr = getOutputFeedFromBundle();
         FeedMerlin feed = new FeedMerlin(feedStr);
-        final CatalogTable catalogTable = new CatalogTable();
-        catalogTable.setUri(tableUri);
-        feed.setTable(catalogTable);
+        feed.setTableUri(tableUri);
         writeFeedElement(feed, feed.getName());
     }
 
     public void setCLusterWorkingPath(String clusterData, String path) {
         ClusterMerlin c = new ClusterMerlin(clusterData);
-        for (int i = 0; i < c.getLocations().getLocations().size(); i++) {
-            if (c.getLocations().getLocations().get(i).getName().equals(ClusterLocationType.WORKING)) {
-                c.getLocations().getLocations().get(i).setPath(path);
-            }
-        }
-
-        //this.setClusterData(clusterData)
+        c.setWorkingLocationPath(path);
         writeClusterElement(c);
     }
 
@@ -795,17 +686,13 @@ public class Bundle {
 
     public void setProcessLibPath(String libPath) {
         ProcessMerlin processElement = getProcessObject();
-        Workflow wf = processElement.getWorkflow();
-        wf.setLib(libPath);
-        processElement.setWorkflow(wf);
+        processElement.getWorkflow().setLib(libPath);
         setProcessData(processElement.toString());
-
     }
 
     public void setProcessTimeOut(int magnitude, TimeUnit unit) {
         ProcessMerlin processElement = getProcessObject();
-        Frequency frq = new Frequency("" + magnitude, unit);
-        processElement.setTimeout(frq);
+        processElement.setTimeOut(magnitude, unit);
         setProcessData(processElement.toString());
     }
 
@@ -901,9 +788,7 @@ public class Bundle {
      */
     public void setProcessInputNames(String... names) {
         ProcessMerlin p = new ProcessMerlin(processData);
-        for (int i = 0; i < names.length; i++) {
-            p.getInputs().getInputs().get(i).setName(names[i]);
-        }
+        p.setInputNames(names);
         processData = p.toString();
     }
 
@@ -914,9 +799,7 @@ public class Bundle {
      */
     public void addProcessProperty(Property... properties) {
         ProcessMerlin p = new ProcessMerlin(processData);
-        for (Property property : properties) {
-            p.getProperties().getProperties().add(property);
-        }
+        p.addProperties(properties);
         processData = p.toString();
     }
 
@@ -927,9 +810,7 @@ public class Bundle {
      */
     public void setProcessInputPartition(String... partition) {
         ProcessMerlin p = new ProcessMerlin(processData);
-        for (int i = 0; i < partition.length; i++) {
-            p.getInputs().getInputs().get(i).setPartition(partition[i]);
-        }
+        p.setInputPartition(partition);
         processData = p.toString();
     }
 
@@ -940,46 +821,23 @@ public class Bundle {
      */
     public void setProcessOutputNames(String... names) {
         ProcessMerlin p = new ProcessMerlin(processData);
-        Outputs outputs = p.getOutputs();
-        Assert.assertEquals(outputs.getOutputs().size(), names.length,
-                "Number of output names is not equal to number of outputs in process");
-        for (int i = 0; i < names.length; i++) {
-            outputs.getOutputs().get(i).setName(names[i]);
-        }
-        p.setOutputs(outputs);
+        p.setOutputNames(names);
         processData = p.toString();
     }
 
-    public void addInputFeedToBundle(String feedRefName, String feed, int templateInputIdx) {
-        this.getDataSets().add(feed);
-        String feedName = Util.readEntityName(feed);
-        String vProcessData = getProcessData();
-
-        ProcessMerlin processObject = new ProcessMerlin(vProcessData);
-        final List<Input> processInputs = processObject.getInputs().getInputs();
-        Input templateInput = processInputs.get(templateInputIdx);
-        Input newInput = new Input();
-        newInput.setFeed(feedName);
-        newInput.setName(feedRefName);
-        newInput.setOptional(templateInput.isOptional());
-        newInput.setStart(templateInput.getStart());
-        newInput.setEnd(templateInput.getEnd());
-        newInput.setPartition(templateInput.getPartition());
-        processInputs.add(newInput);
+    public void addInputFeedToBundle(String feedRefName, Feed feed) {
+        this.getDataSets().add(feed.toString());
+
+        ProcessMerlin processObject = new ProcessMerlin(processData);
+        processObject.addInputFeed(feedRefName, feed.getName());
         setProcessData(processObject.toString());
     }
 
-    public void addOutputFeedToBundle(String feedRefName, String feed, int templateOutputIdx) {
-        this.getDataSets().add(feed);
-        String feedName = Util.readEntityName(feed);
+    public void addOutputFeedToBundle(String feedRefName, Feed feed) {
+        this.getDataSets().add(feed.toString());
+
         ProcessMerlin processObject = getProcessObject();
-        final List<Output> processOutputs = processObject.getOutputs().getOutputs();
-        Output templateOutput = processOutputs.get(templateOutputIdx);
-        Output newOutput = new Output();
-        newOutput.setFeed(feedName);
-        newOutput.setName(feedRefName);
-        newOutput.setInstance(templateOutput.getInstance());
-        processOutputs.add(newOutput);
+        processObject.addOutputFeed(feedRefName, feed.getName());
         setProcessData(processObject.toString());
     }
 
@@ -1000,26 +858,12 @@ public class Bundle {
 
     public String getInputFeedFromBundle() {
         ProcessMerlin processObject = new ProcessMerlin(getProcessData());
-        for (Input input : processObject.getInputs().getInputs()) {
-            for (String feed : getDataSets()) {
-                if (Util.readEntityName(feed).equalsIgnoreCase(input.getFeed())) {
-                    return feed;
-                }
-            }
-        }
-        return null;
+        return getFeed(processObject.getInputs().getInputs().get(0).getFeed());
     }
 
     public String getOutputFeedFromBundle() {
         ProcessMerlin processObject = new ProcessMerlin(getProcessData());
-        for (Output output : processObject.getOutputs().getOutputs()) {
-            for (String feed : getDataSets()) {
-                if (Util.readEntityName(feed).equalsIgnoreCase(output.getFeed())) {
-                    return feed;
-                }
-            }
-        }
-        return null;
+        return getFeed(processObject.getOutputs().getOutputs().get(0).getFeed());
     }
 
     public String getOutputFeedNameFromBundle() {

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
index 0b2c4e1..a825845 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -237,7 +237,7 @@ public final class BundleUtil {
         return property;
     }
 
-    public static List<ClusterMerlin> fromString(List<String> clusterStrings) {
+    public static List<ClusterMerlin> getClustersFromStrings(List<String> clusterStrings) {
         List<ClusterMerlin> clusters = new ArrayList<ClusterMerlin>();
         for (String clusterString : clusterStrings) {
             clusters.add(new ClusterMerlin(clusterString));

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
index 4620787..6c90256 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -26,10 +26,6 @@ import com.google.gson.JsonSyntaxException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
@@ -527,37 +523,6 @@ public final class InstanceUtil {
         return InstanceUtil.sendRequestProcessInstance(url, user);
     }
 
-    /**
-     * Retrieves prefix (main sub-folders) of feed data path.
-     */
-    public static String getFeedPrefix(String feed) {
-        FeedMerlin feedElement = new FeedMerlin(feed);
-        String locationPath = feedElement.getLocations().getLocations().get(0).getPath();
-        locationPath = locationPath.substring(0, locationPath.indexOf('$'));
-        return locationPath;
-    }
-
-    /**
-     * Adds one input into process.
-     *
-     * @param process - where input should be inserted
-     * @param feed    - feed which will be used as input feed
-     * @return - string representation of process definition
-     */
-    public static String addProcessInputFeed(String process, String feed, String feedName) {
-
-        ProcessMerlin processElement = new ProcessMerlin(process);
-        Input in1 = processElement.getInputs().getInputs().get(0);
-        Input in2 = new Input();
-        in2.setEnd(in1.getEnd());
-        in2.setFeed(feed);
-        in2.setName(feedName);
-        in2.setPartition(in1.getPartition());
-        in2.setStart(in1.getStart());
-        processElement.getInputs().getInputs().add(in2);
-        return processElement.toString();
-    }
-
     public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord(
             ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
         OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
@@ -598,21 +563,6 @@ public final class InstanceUtil {
         return actionInfo.getRun();
     }
 
-
-    /**
-     * Sets new feed data path.
-     *
-     * @param feed feed which is to be modified
-     * @param path new feed data path
-     * @return modified feed
-     */
-    public static String setFeedFilePath(String feed, String path) {
-
-        FeedMerlin feedElement = new FeedMerlin(feed);
-        feedElement.getLocations().getLocations().get(0).setPath(path);
-        return feedElement.toString();
-    }
-
     public static int checkIfFeedCoordExist(AbstractEntityHelper helper,
             String feedName, String coordType) throws OozieClientException {
         LOGGER.info("feedName: " + feedName);
@@ -640,47 +590,6 @@ public final class InstanceUtil {
         return numberOfCoord;
     }
 
-    /**
-     * Sets process frequency.
-     *
-     * @return modified process definition
-     */
-    public static String setProcessFrequency(String process, Frequency frequency) {
-        ProcessMerlin p = new ProcessMerlin(process);
-        p.setFrequency(frequency);
-        return p.toString();
-    }
-
-    /**
-     * Sets new process name.
-     */
-    public static String setProcessName(String process, String newName) {
-        ProcessMerlin p = new ProcessMerlin(process);
-        p.setName(newName);
-        return p.toString();
-    }
-
-    /**
-     * Sets new process validity on all the process clusters.
-     *
-     * @param process   process entity to be modified
-     * @param startTime start of process validity
-     * @param endTime   end of process validity
-     * @return modified process definition
-     */
-    public static String setProcessValidity(String process,
-            String startTime, String endTime) {
-        ProcessMerlin processElement = new ProcessMerlin(process);
-
-        for (int i = 0; i < processElement.getClusters().getClusters().size(); i++) {
-            processElement.getClusters().getClusters().get(i).getValidity().setStart(
-                    TimeUtil.oozieDateToDate(startTime).toDate());
-            processElement.getClusters().getClusters().get(i).getValidity()
-                    .setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
-        }
-        return processElement.toString();
-    }
-
     public static List<CoordinatorAction> getProcessInstanceListFromAllBundles(
             ColoHelper coloHelper, String processName, EntityType entityType)
         throws OozieClientException {
@@ -950,17 +859,6 @@ public final class InstanceUtil {
     }
 
     /**
-     * Sets feed frequency.
-     *
-     * @return modified feed
-     */
-    public static String setFeedFrequency(String feed, Frequency f) {
-        FeedMerlin feedElement = new FeedMerlin(feed);
-        feedElement.setFrequency(f);
-        return feedElement.toString();
-    }
-
-    /**
      * Waits till instances of specific job will be created during specific time.
      * Use this method directly in unusual test cases where timeouts are different from trivial.
      * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int)

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
index eaa69f0..02280f3 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
@@ -287,36 +287,32 @@ public class AuthorizationTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
 
         //check that there are 3 running instances
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+                CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
 
         //check that there are 2 waiting instances
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2,
+                CoordinatorAction.Status.WAITING, EntityType.PROCESS);
 
         //3 instances should be running , other 2 should be waiting
-        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
-                .readEntityName(bundles[0].getProcessData()),
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + endTime);
         InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
 
         //suspend 3 running instances
-        r = prism.getProcessHelper().getProcessInstanceSuspend(Util
-                .readEntityName(bundles[0].getProcessData()),
+        r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + midTime);
         InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
 
         //try to resume suspended instances by U2
-        r = prism.getProcessHelper().getProcessInstanceResume(Util.readEntityName(bundles[0]
-                .getProcessData()), "?start=" + startTime + "&end=" + midTime,
-            MerlinConstants.USER2_NAME);
+        r = prism.getProcessHelper().getProcessInstanceResume(bundles[0].getProcessName(), "?start=" + startTime
+                + "&end=" + midTime, MerlinConstants.USER2_NAME);
 
         //the state of above 3 instances should still be suspended
         InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
 
         //check the status of all instances
-        r = prism.getProcessHelper().getProcessInstanceStatus(Util
-                .readEntityName(bundles[0].getProcessData()),
+        r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + endTime);
         InstanceUtil.validateResponse(r, 5, 0, 3, 2, 0);
     }
@@ -353,18 +349,16 @@ public class AuthorizationTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
 
         //check that there are 3 running instances
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+                CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
 
         //3 instances should be running , other 2 should be waiting
-        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
-                .readEntityName(bundles[0].getProcessData()),
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + endTime);
         InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
 
         //try to kill all instances by U2
-        r = prism.getProcessHelper().getProcessInstanceKill(Util
-                .readEntityName(bundles[0].getProcessData()),
+        r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME);
 
         //number of instances should be the same as before
@@ -401,28 +395,25 @@ public class AuthorizationTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
 
         //check that there are 3 running instances
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+                CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
 
         //check that there are 2 waiting instances
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2,
+                CoordinatorAction.Status.WAITING, EntityType.PROCESS);
 
         //3 instances should be running , other 2 should be waiting
-        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
-                .readEntityName(bundles[0].getProcessData()),
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + endTime);
         InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
 
         //suspend 3 running instances
-        r = prism.getProcessHelper().getProcessInstanceSuspend(Util
-                .readEntityName(bundles[0].getProcessData()),
+        r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + midTime);
         InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
 
         //try to kill all instances by U2
-        r = prism.getProcessHelper().getProcessInstanceKill(Util
-                .readEntityName(bundles[0].getProcessData()),
+        r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME);
 
         //3 should still be suspended, 2 should be waiting
@@ -465,29 +456,24 @@ public class AuthorizationTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
 
         //check that there are 4 running instances
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 4, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 4,
+                CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
 
         //4 instances should be running , 1 should be waiting
-        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
-                .readEntityName(bundles[0].getProcessData()),
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(),
             "?start=" + startTime + "&end=" + endTime);
         InstanceUtil.validateResponse(r, 5, 4, 0, 1, 0);
 
         //kill 3 running instances
-        r = prism.getProcessHelper().getProcessInstanceKill(Util
-            .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end="
-                +
-            midTime);
+        r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(),
+                "?start=" + startTime + "&end=" + midTime);
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
 
         //generally 3 instances should be killed, 1 is running and 1 is waiting
 
         //try to rerun instances by U2
-        r = prism.getProcessHelper().getProcessInstanceRerun(Util
-            .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end="
-                +
-            midTime, MerlinConstants.USER2_NAME);
+        r = prism.getProcessHelper().getProcessInstanceRerun(bundles[0].getProcessName(),
+                "?start=" + startTime + "&end=" + midTime, MerlinConstants.USER2_NAME);
 
         //instances should still be killed
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
@@ -510,7 +496,7 @@ public class AuthorizationTest extends BaseTestClass {
         Assert.assertTrue(definition.contains(feed.getName()) && !definition.contains("(feed) not found"),
             "Feed should be already submitted");
         //update feed definition
-        FeedMerlin newFeed = new FeedMerlin(feed.toString());
+        FeedMerlin newFeed = new FeedMerlin(feed);
         newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
         //try to update feed by U2
         final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
@@ -589,8 +575,7 @@ public class AuthorizationTest extends BaseTestClass {
     @Test(enabled = false)
     public void u1ScheduleFeedU2ScheduleDependantProcessU1UpdateFeed() throws Exception {
         FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        String process = bundles[0].getProcessData();
-        process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
         //submit both feeds
         bundles[0].submitClusters(prism);
         bundles[0].submitFeeds(prism);
@@ -600,13 +585,13 @@ public class AuthorizationTest extends BaseTestClass {
 
         //by U2 schedule process dependant on scheduled feed by U1
         ServiceResponse serviceResponse = prism.getProcessHelper()
-            .submitAndSchedule(process, MerlinConstants.USER2_NAME);
+            .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME);
         AssertUtil.assertSucceeded(serviceResponse);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
 
         //get old process details
         String oldProcessBundleId = InstanceUtil
-            .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+            .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
         String oldProcessUser =
             getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
 
@@ -615,7 +600,7 @@ public class AuthorizationTest extends BaseTestClass {
         String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
 
         //update feed definition
-        FeedMerlin newFeed = new FeedMerlin(feed.toString());
+        FeedMerlin newFeed = new FeedMerlin(feed);
         newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
 
         //update feed by U1
@@ -630,7 +615,7 @@ public class AuthorizationTest extends BaseTestClass {
         Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same");
 
         //new process bundle should be created by U2
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
         String newProcessUser =
             getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
@@ -641,8 +626,7 @@ public class AuthorizationTest extends BaseTestClass {
     @Test(enabled = false)
     public void u1ScheduleFeedU2ScheduleDependantProcessU2UpdateFeed() throws Exception {
         FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        String process = bundles[0].getProcessData();
-        process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
         //submit both feeds
         bundles[0].submitClusters(prism);
         bundles[0].submitFeeds(prism);
@@ -651,18 +635,18 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING);
 
         //by U2 schedule process dependent on scheduled feed by U1
-        ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process,
+        ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData(),
                 MerlinConstants.USER2_NAME);
         AssertUtil.assertSucceeded(serviceResponse);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
 
         //update feed definition
-        FeedMerlin newFeed = new FeedMerlin(feed.toString());
+        FeedMerlin newFeed = new FeedMerlin(feed);
         newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
 
         //get old process details
         String oldProcessBundleId = InstanceUtil
-                .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+                .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
         String oldProcessUser =
                 getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
 
@@ -682,7 +666,7 @@ public class AuthorizationTest extends BaseTestClass {
         Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser);
 
         //new process bundle should be created by U2
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
         String newProcessUser =
                 getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
         Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
@@ -693,8 +677,7 @@ public class AuthorizationTest extends BaseTestClass {
     @Test(enabled = false)
     public void u1ScheduleFeedU1ScheduleDependantProcessU1UpdateProcess() throws Exception {
         String feed = bundles[0].getInputFeedFromBundle();
-        String process = bundles[0].getProcessData();
-        process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
         //submit both feeds
         bundles[0].submitClusters(prism);
         bundles[0].submitFeeds(prism);
@@ -703,13 +686,13 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
 
         //by U1 schedule process dependent on scheduled feed by U1
-        ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process);
+        ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData());
         AssertUtil.assertSucceeded(serviceResponse);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
 
         //get old process details
         String oldProcessBundleId = InstanceUtil
-                .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+                .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
         String oldProcessUser =
                 getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
 
@@ -718,16 +701,16 @@ public class AuthorizationTest extends BaseTestClass {
                 .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
 
         //update process by U1
-        ProcessMerlin processObj = new ProcessMerlin(process);
+        ProcessMerlin processObj = bundles[0].getProcessObject();
         processObj.setProperty("randomProp", "randomVal");
-        serviceResponse = prism.getProcessHelper().update(process, processObj.toString());
+        serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString());
         AssertUtil.assertSucceeded(serviceResponse);
 
         //new feed bundle should not be created
         OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
 
         //new process bundle should be created by U1
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
         String newProcessUser =
                 getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
         Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
@@ -738,8 +721,7 @@ public class AuthorizationTest extends BaseTestClass {
     @Test(enabled = false)
     public void u1ScheduleFeedU1ScheduleDependantProcessU2UpdateProcess() throws Exception {
         String feed = bundles[0].getInputFeedFromBundle();
-        String process = bundles[0].getProcessData();
-        process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z");
         //submit both feeds
         bundles[0].submitClusters(prism);
         bundles[0].submitFeeds(prism);
@@ -748,13 +730,13 @@ public class AuthorizationTest extends BaseTestClass {
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
 
         //by U1 schedule process dependent on scheduled feed by U1
-        ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process);
+        ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData());
         AssertUtil.assertSucceeded(serviceResponse);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
 
         //get old process details
         String oldProcessBundleId = InstanceUtil
-                .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+                .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
         String oldProcessUser =
                 getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
 
@@ -763,9 +745,9 @@ public class AuthorizationTest extends BaseTestClass {
                 .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
 
         //update process by U2
-        ProcessMerlin processObj = new ProcessMerlin(process);
+        ProcessMerlin processObj = bundles[0].getProcessObject();
         processObj.setProperty("randomProp", "randomVal");
-        serviceResponse = prism.getProcessHelper().update(process, processObj.toString(),
+        serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString(),
                 TimeUtil.getTimeWrtSystemTime(0), MerlinConstants.USER2_NAME);
         AssertUtil.assertSucceeded(serviceResponse);
 
@@ -773,7 +755,7 @@ public class AuthorizationTest extends BaseTestClass {
         OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
 
         //new process bundle should be created by U2
-        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
         String newProcessUser =
                 getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
         Assert.assertNotEquals(oldProcessUser, newProcessUser, "User should not be the same");

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
index b7eb77f..33b0e77 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.regression;
 
-import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -149,7 +148,7 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
 
     private List<String> getMissingDependencies(ColoHelper prismHelper, Bundle bundle) throws OozieClientException {
         List<String> bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
-                new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS);
+                bundle.getProcessName(), EntityType.PROCESS);
         String coordID = bundles.get(0);
         List<String> missingDependencies =
                 OozieUtil.getMissingDependencies(prismHelper, coordID);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
index 41e3002..07292e1 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.regression;
 
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.BundleUtil;
@@ -157,7 +156,7 @@ public class ELValidationsTest extends BaseTestClass {
             List<String> bundles = null;
             for (int i = 0; i < 10; ++i) {
                 bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
-                    new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS);
+                    bundle.getProcessName(), EntityType.PROCESS);
                 if (bundles.size() > 0) {
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
index 6b227d6..8eff8e4 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
@@ -143,30 +143,30 @@ public class ExternalFSTest extends BaseTestClass{
             new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}", "${MINUTE}"}, separator);
 
         //configure feed
-        String feed = bundles[0].getDataSets().get(0);
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
         String targetDataLocation = endpoint + testWasbTargetDir + datePattern;
-        feed = InstanceUtil.setFeedFilePath(feed, sourcePath + '/' + datePattern);
+        feed.setFilePath(sourcePath + '/' + datePattern);
         //erase all clusters from feed definition
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        feed.clearFeedClusters();
         //set local cluster as source
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTime, endTime)
                 .withClusterType(ClusterType.SOURCE)
-                .build()).toString();
+                .build());
         //set externalFS cluster as target
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(externalBundle.getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTime, endTime)
                 .withClusterType(ClusterType.TARGET)
                 .withDataLocation(targetDataLocation)
-                .build()).toString();
+                .build());
 
         //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
         datePattern = StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH", "mm"}, separator);
         //upload necessary data
         DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -181,15 +181,15 @@ public class ExternalFSTest extends BaseTestClass{
         Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern);
 
         //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, feed.toString(), 0);
 
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed.toString()),
                 "REPLICATION"), 1);
 
         TimeUtil.sleepSeconds(10);
         //replication should start, wait while it ends
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed), 1,
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data has been replicated correctly

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
index a6639ed..6dbe60e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
@@ -77,7 +77,7 @@ public class FeedInstanceListingTest extends BaseTestClass{
         bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
-        processName = Util.readEntityName(bundles[0].getProcessData());
+        processName = bundles[0].getProcessName();
         HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
index 2c8346d..61ef8f3 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
@@ -87,32 +87,32 @@ public class FeedLateRerunTest extends BaseTestClass {
         LOGGER.info("Time range between : " + startTime + " and " + endTime);
 
         //configure feed
-        String feed = bundles[0].getDataSets().get(0);
-        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(feedDataLocation);
         //erase all clusters from feed definition
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        feed.clearFeedClusters();
         //set cluster1 as source
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTime, endTime)
                 .withClusterType(ClusterType.SOURCE)
-                .build()).toString();
+                .build());
         //set cluster2 as target
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
+        feed.addFeedCluster(
             new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
                 .withRetention("days(1000000)", ActionType.DELETE)
                 .withValidity(startTime, endTime)
                 .withClusterType(ClusterType.TARGET)
                 .withDataLocation(targetDataLocation)
-                .build()).toString();
-        String entityName = Util.readEntityName(feed);
+                .build());
+        String entityName = feed.getName();
 
         //submit and schedule feed
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
 
         //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
         Assert.assertEquals(InstanceUtil
             .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, "REPLICATION"), 1);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
index eb8c4fe..de4f692 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -112,29 +112,29 @@ public class FeedReplicationTest extends BaseTestClass {
         LOGGER.info("Time range between : " + startTime + " and " + endTime);
 
         //configure feed
-        String feed = bundles[0].getDataSets().get(0);
-        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(feedDataLocation);
         //erase all clusters from feed definition
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        feed.clearFeedClusters();
         //set cluster1 as source
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.SOURCE)
-                        .build()).toString();
+        feed.addFeedCluster(
+            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                .withRetention("days(1000000)", ActionType.DELETE)
+                .withValidity(startTime, endTime)
+                .withClusterType(ClusterType.SOURCE)
+                .build());
         //set cluster2 as target
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.TARGET)
-                        .withDataLocation(targetDataLocation)
-                        .build()).toString();
+        feed.addFeedCluster(
+            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                .withRetention("days(1000000)", ActionType.DELETE)
+                .withValidity(startTime, endTime)
+                .withClusterType(ClusterType.TARGET)
+                .withDataLocation(targetDataLocation)
+                .build());
 
         //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
 
         //upload necessary data
         DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -152,15 +152,15 @@ public class FeedReplicationTest extends BaseTestClass {
         }
 
         //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
 
         Assert.assertEquals(InstanceUtil
-                .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
-                        "REPLICATION"), 1);
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed.toString()),
+                "REPLICATION"), 1);
 
         //replication should start, wait while it ends
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
-                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data has been replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil
@@ -191,37 +191,37 @@ public class FeedReplicationTest extends BaseTestClass {
         LOGGER.info("Time range between : " + startTime + " and " + endTime);
 
         //configure feed
-        String feed = bundles[0].getDataSets().get(0);
-        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(feedDataLocation);
         //erase all clusters from feed definition
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        feed.clearFeedClusters();
         //set cluster1 as source
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.SOURCE)
-                        .build()).toString();
+        feed.addFeedCluster(
+            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                .withRetention("days(1000000)", ActionType.DELETE)
+                .withValidity(startTime, endTime)
+                .withClusterType(ClusterType.SOURCE)
+                .build());
         //set cluster2 as target
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.TARGET)
-                        .withDataLocation(targetDataLocation)
-                        .build()).toString();
+        feed.addFeedCluster(
+            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                .withRetention("days(1000000)", ActionType.DELETE)
+                .withValidity(startTime, endTime)
+                .withClusterType(ClusterType.TARGET)
+                .withDataLocation(targetDataLocation)
+                .build());
         //set cluster3 as target
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.TARGET)
-                        .withDataLocation(targetDataLocation)
-                        .build()).toString();
+        feed.addFeedCluster(
+            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
+                .withRetention("days(1000000)", ActionType.DELETE)
+                .withValidity(startTime, endTime)
+                .withClusterType(ClusterType.TARGET)
+                .withDataLocation(targetDataLocation)
+                .build());
 
         //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
 
         //upload necessary data
         DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -240,23 +240,23 @@ public class FeedReplicationTest extends BaseTestClass {
         }
 
         //check if all coordinators exist
-        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
 
-        InstanceUtil.waitTillInstancesAreCreated(cluster3, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, feed.toString(), 0);
 
         Assert.assertEquals(InstanceUtil
-                .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
-                        "REPLICATION"), 1);
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(),
+                "REPLICATION"), 1);
         Assert.assertEquals(InstanceUtil
-                .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
-                        "REPLICATION"), 1);
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(),
+                "REPLICATION"), 1);
         //replication on cluster 2 should start, wait till it ends
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
-                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //replication on cluster 3 should start, wait till it ends
-        InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 1,
-                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+        InstanceUtil.waitTillInstanceReachState(cluster3OC, feed.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data has been replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil
@@ -299,29 +299,29 @@ public class FeedReplicationTest extends BaseTestClass {
         FeedMerlin feedElement = bundles[0].getFeedElement(feedName);
         feedElement.setAvailabilityFlag(availabilityFlagName);
         bundles[0].writeFeedElement(feedElement, feedName);
-        String feed = bundles[0].getDataSets().get(0);
-        feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation);
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(feedDataLocation);
         //erase all clusters from feed definition
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
+        feed.clearFeedClusters();
         //set cluster1 as source
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.SOURCE)
-                        .build()).toString();
+        feed.addFeedCluster(
+            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                .withRetention("days(1000000)", ActionType.DELETE)
+                .withValidity(startTime, endTime)
+                .withClusterType(ClusterType.SOURCE)
+                .build());
         //set cluster2 as target
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.TARGET)
-                        .withDataLocation(targetDataLocation)
-                        .build()).toString();
+        feed.addFeedCluster(
+            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                .withRetention("days(1000000)", ActionType.DELETE)
+                .withValidity(startTime, endTime)
+                .withClusterType(ClusterType.TARGET)
+                .withDataLocation(targetDataLocation)
+                .build());
 
         //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
 
         //upload necessary data
         DateTime date = new DateTime(startTime, DateTimeZone.UTC);
@@ -339,7 +339,7 @@ public class FeedReplicationTest extends BaseTestClass {
         }
 
         //check while instance is got created
-        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0);
 
         //check if coordinator exists
         Assert.assertEquals(InstanceUtil
@@ -357,12 +357,12 @@ public class FeedReplicationTest extends BaseTestClass {
                 OSUtil.OOZIE_EXAMPLE_INPUT_DATA + availabilityFlagName);
 
         //check if instance become running
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
-                CoordinatorAction.Status.RUNNING, EntityType.FEED);
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+            CoordinatorAction.Status.RUNNING, EntityType.FEED);
 
         //wait till instance succeed
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,
-                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data was replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
index 4ce6026..47523d8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
@@ -149,17 +149,17 @@ public class LogMoverTest extends BaseTestClass {
     private boolean validate(boolean logFlag) throws Exception {
         String stagingDir= MerlinConstants.STAGING_LOCATION;
         String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs";
-        List<Path> logmoverPath = HadoopUtil
+        List<Path> logmoverPaths = HadoopUtil
                 .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path)));
         if (logFlag) {
-            for(int index=0; index < logmoverPath.size(); index++) {
-                if (logmoverPath.get(index).toString().contains("SUCCEEDED")) {
+            for (Path logmoverPath : logmoverPaths) {
+                if (logmoverPath.toString().contains("SUCCEEDED")) {
                     return true;
                 }
             }
         } else {
-            for(int index=0; index < logmoverPath.size(); index++) {
-                if (logmoverPath.get(index).toString().contains("FAILED")) {
+            for (Path logmoverPath : logmoverPaths) {
+                if (logmoverPath.toString().contains("FAILED")) {
                     return true;
                 }
             }


[2/3] falcon git commit: FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by Ruslan Ostafiychuk

Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
index 13a9776..76d033c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
@@ -93,7 +93,8 @@ public class NewRetryTest extends BaseTestClass {
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         startDate = new DateTime(DateTimeZone.UTC).plusMinutes(1);
         endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2);
-        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessValidity(TimeUtil.dateToOozieDate(startDate.toDate()),
+            TimeUtil.dateToOozieDate(endDate.toDate()));
 
         FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
         feed.setFeedPathValue(latePath).insertLateFeedValue(new Frequency("minutes(8)"));
@@ -135,7 +136,7 @@ public class NewRetryTest extends BaseTestClass {
 
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
 
@@ -150,7 +151,7 @@ public class NewRetryTest extends BaseTestClass {
             prism.getProcessHelper()
                 .update((bundles[0].getProcessData()), bundles[0].getProcessData());
             String newBundleId = InstanceUtil.getLatestBundleID(cluster,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+                bundles[0].getProcessName(), EntityType.PROCESS);
 
             Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
 
@@ -183,7 +184,7 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             for (int attempt = 0;
                  attempt < 20 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
@@ -205,7 +206,7 @@ public class NewRetryTest extends BaseTestClass {
                     .getMessage().contains("updated successfully"),
                     "process was not updated successfully");
                 String newBundleId = InstanceUtil.getLatestBundleID(cluster,
-                    Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+                    bundles[0].getProcessName(), EntityType.PROCESS);
 
                 Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
 
@@ -242,7 +243,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             for (int i = 0; i < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++i) {
                 TimeUtil.sleepSeconds(10);
@@ -260,7 +261,7 @@ public class NewRetryTest extends BaseTestClass {
                     .getMessage().contains("updated successfully"),
                 "process was not updated successfully");
             String newBundleId = InstanceUtil.getLatestBundleID(cluster,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+                bundles[0].getProcessName(), EntityType.PROCESS);
 
             Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
 
@@ -295,7 +296,7 @@ public class NewRetryTest extends BaseTestClass {
 
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             for (int attempt = 0;
                  attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 2); ++attempt) {
@@ -316,7 +317,7 @@ public class NewRetryTest extends BaseTestClass {
                     .getMessage().contains("updated successfully"),
                 "process was not updated successfully");
             String newBundleId = InstanceUtil.getLatestBundleID(cluster,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+                bundles[0].getProcessName(), EntityType.PROCESS);
 
             Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
 
@@ -350,7 +351,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
 
@@ -360,11 +361,11 @@ public class NewRetryTest extends BaseTestClass {
 
             LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
             Assert.assertTrue(prism.getProcessHelper()
-                .update(Util.readEntityName(bundles[0].getProcessData()),
+                .update(bundles[0].getProcessName(),
                     null).getMessage()
                 .contains("updated successfully"), "process was not updated successfully");
             String newBundleId = InstanceUtil.getLatestBundleID(cluster,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+                bundles[0].getProcessName(), EntityType.PROCESS);
 
             Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
 
@@ -398,7 +399,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
 
@@ -408,11 +409,11 @@ public class NewRetryTest extends BaseTestClass {
 
             LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
             Assert.assertTrue(
-                prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData()),
+                prism.getProcessHelper().update(bundles[0].getProcessName(),
                     bundles[0].getProcessData()).getMessage()
                     .contains("updated successfully"), "process was not updated successfully");
             String newBundleId = InstanceUtil.getLatestBundleID(cluster,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+                bundles[0].getProcessName(), EntityType.PROCESS);
 
             Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
 
@@ -449,7 +450,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
 
@@ -460,12 +461,12 @@ public class NewRetryTest extends BaseTestClass {
 
             LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
             Assert.assertTrue(prism.getProcessHelper()
-                    .update(Util.readEntityName(bundles[0].getProcessData()),
+                    .update(bundles[0].getProcessName(),
                         bundles[0].getProcessData()).getMessage()
                     .contains("updated successfully"),
                 "process was not updated successfully");
             String newBundleId = InstanceUtil
-                .getLatestBundleID(cluster, Util.readEntityName(bundles[0].getProcessData()),
+                .getLatestBundleID(cluster, bundles[0].getProcessName(),
                     EntityType.PROCESS);
 
             Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
@@ -503,7 +504,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
 
@@ -513,11 +514,11 @@ public class NewRetryTest extends BaseTestClass {
 
             LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
             Assert.assertFalse(
-                prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData())
+                prism.getProcessHelper().update(bundles[0].getProcessName()
                     , bundles[0].getProcessData()).getMessage().contains("updated successfully"),
                 "process was updated successfully!!!");
             String newBundleId = InstanceUtil.getLatestBundleID(cluster,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+                bundles[0].getProcessName(), EntityType.PROCESS);
 
             Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
 
@@ -555,7 +556,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             //now to validate all failed instances to check if they were retried or not.
             validateRetry(clusterOC, bundleId,
@@ -595,7 +596,7 @@ public class NewRetryTest extends BaseTestClass {
 
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             for (int attempt = 0;
                  attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
@@ -608,7 +609,7 @@ public class NewRetryTest extends BaseTestClass {
             LOGGER.info("now firing user reruns:");
             for (int i = 0; i < 1; i++) {
                 prism.getProcessHelper()
-                    .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                    .getProcessInstanceRerun(bundles[0].getProcessName(),
                         "?start=" + timeFormatter.print(startDate).replace("/", "T") + "Z"
                             + "&end=" + timeFormatter.print(endDate).replace("/", "T") + "Z");
             }
@@ -648,7 +649,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()),
+                bundles[0].getProcessName(),
                 EntityType.PROCESS).get(0);
 
             //now to validate all failed instances to check if they were retried or not.
@@ -659,7 +660,7 @@ public class NewRetryTest extends BaseTestClass {
 
             DateTime[] dateBoundaries = getFailureTimeBoundaries(clusterOC, bundleId);
             InstancesResult piResult = prism.getProcessHelper()
-                .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                .getProcessInstanceRerun(bundles[0].getProcessName(),
                     "?start=" + timeFormatter.print(dateBoundaries[0]).replace("/", "T") + "Z"
                         + "&end=" + timeFormatter.print(dateBoundaries[dateBoundaries.length - 1])
                          .replace("/", "T") + "Z");
@@ -702,7 +703,7 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
             List<DateTime> dates = null;
 
             for (int i = 0; i < 10 && dates == null; ++i) {
@@ -802,7 +803,7 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
             List<DateTime> dates = null;
 
             for (int i = 0; i < 10 && dates == null; ++i) {
@@ -878,7 +879,7 @@ public class NewRetryTest extends BaseTestClass {
                 prism.getProcessHelper().schedule(bundles[0].getProcessData()));
             //now wait till the process is over
             String bundleId = OozieUtil.getBundles(clusterOC,
-                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
+                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
 
             validateRetry(clusterOC, bundleId,
                 (bundles[0].getProcessObject().getRetry().getAttempts()) / 2);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
index 59a701d..61c076b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
@@ -83,10 +83,9 @@ public class NoOutputProcessTest extends BaseTestClass {
         bundles[0].setInputFeedDataPath(inputPath);
         bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin process = bundles[0].getProcessObject();
         process.setOutputs(null);
         process.setLateProcess(null);
-        bundles[0].setProcessData(process.toString());
         bundles[0].submitFeedsScheduleProcess(prism);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
index 8cf2862..d7331cf 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
@@ -30,7 +30,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.hadoop.fs.FileSystem;
@@ -103,7 +102,7 @@ public class ProcessFrequencyTest extends BaseTestClass {
             TimeUtil.oozieDateToDate(startDate));
         HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT);
 
-        final String processName = Util.readEntityName(bundles[0].getProcessData());
+        final String processName = bundles[0].getProcessName();
         //InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
index 48cb59b..56e1474 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
@@ -119,11 +119,11 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
             TimeUtil.getTimeWrtSystemTime(-35), TimeUtil.getTimeWrtSystemTime(25), 1);
 
-        String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
+        String prefix = feed01.getFeedPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
         HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.SINGLE_FILE, prefix, dataDates);
 
-        prefix = InstanceUtil.getFeedPrefix(feed02.toString());
+        prefix = feed02.getFeedPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
         HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, prefix, dataDates);
 
@@ -179,7 +179,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
         String processStartTime = TimeUtil.getTimeWrtSystemTime(-16);
         // String processEndTime = InstanceUtil.getTimeWrtSystemTime(20);
 
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin process = bundles[0].getProcessObject();
         process.clearProcessCluster();
         process.addProcessCluster(
             new ProcessMerlin.ProcessClusterBuilder(
@@ -192,8 +192,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
                 .withValidity(TimeUtil.addMinsToTime(processStartTime, 16),
                     TimeUtil.addMinsToTime(processStartTime, 45))
                 .build());
-        process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(), feed02.getName(),
-            feed02.getName()));
+        process.addInputFeed(feed02.getName(), feed02.getName());
 
         //submit and schedule process
         prism.getProcessHelper().submitAndSchedule(process.toString());
@@ -204,42 +203,40 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(serverOC.get(1), process.getName(), 1,
             Status.RUNNING, EntityType.PROCESS);
 
-        final String processName = Util.readEntityName(bundles[0].getProcessData());
-        InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(
-            processName, "?start=" + processStartTime
-            + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
+        InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
+                "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
         AssertUtil.assertSucceeded(responseInstance);
         Assert.assertTrue(responseInstance.getInstances() != null);
 
-        responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(processName,
+        responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(process.getName(),
             "?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
                 + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
         AssertUtil.assertSucceeded(responseInstance);
         Assert.assertTrue(responseInstance.getInstances() != null);
 
-        responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName,
+        responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
             "?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
                 + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
         AssertUtil.assertSucceeded(responseInstance);
         Assert.assertTrue(responseInstance.getInstances() != null);
 
-        responseInstance = prism.getProcessHelper().getProcessInstanceResume(processName,
+        responseInstance = prism.getProcessHelper().getProcessInstanceResume(process.getName(),
             "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
         AssertUtil.assertSucceeded(responseInstance);
         Assert.assertTrue(responseInstance.getInstances() != null);
 
-        responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName,
+        responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
             "?start=" + TimeUtil.addMinsToTime(processStartTime, 16)
                 + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
         AssertUtil.assertSucceeded(responseInstance);
         Assert.assertTrue(responseInstance.getInstances() != null);
 
-        responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(processName,
+        responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(process.getName(),
             "?start=" + processStartTime + "&end="+ TimeUtil.addMinsToTime(processStartTime, 7));
         AssertUtil.assertSucceeded(responseInstance);
         Assert.assertTrue(responseInstance.getInstances() != null);
 
-        responseInstance = prism.getProcessHelper().getProcessInstanceRerun(processName,
+        responseInstance = prism.getProcessHelper().getProcessInstanceRerun(process.getName(),
             "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
         AssertUtil.assertSucceeded(responseInstance);
         Assert.assertTrue(responseInstance.getInstances() != null);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index 34dfce3..f299128 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -29,7 +29,6 @@ import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
@@ -83,7 +82,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
-        processName = Util.readEntityName(bundles[0].getProcessData());
+        processName = bundles[0].getProcessName();
     }
 
     @AfterMethod(alwaysRun = true)

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
index f558cc5..3893ffe 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.hadoop.fs.FileSystem;
@@ -79,7 +78,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(6);
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
-        processName = Util.readEntityName(bundles[0].getProcessData());
+        processName = bundles[0].getProcessName();
     }
 
     @AfterMethod(alwaysRun = true)

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
index c9334eb..6003ee0 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.AssertUtil;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -83,7 +82,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setOutputFeedLocationData(feedOutputPath);
-        processName = Util.readEntityName(bundles[0].getProcessData());
+        processName = bundles[0].getProcessName();
     }
 
     @AfterMethod(alwaysRun = true)

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index 58936a7..635e238 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -27,7 +27,6 @@ import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.AssertUtil;
@@ -97,7 +96,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
-        processName = Util.readEntityName(bundles[0].getProcessData());
+        processName = bundles[0].getProcessName();
         HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index 26348bd..b7fed18 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.regression;
 
-import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -29,7 +28,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.core.util.AssertUtil;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.falcon.resource.InstancesResult;
@@ -68,7 +66,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
         bundles[0].setOutputFeedLocationData(feedOutputPath);
-        processName = Util.readEntityName(bundles[0].getProcessData());
+        processName = bundles[0].getProcessName();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -115,8 +113,8 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
         InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, new ProcessMerlin(bundles[0]
-            .getProcessData()).getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
             "?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z");
         AssertUtil.assertSucceeded(r);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index b41cf05..40a4ad2 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -85,8 +85,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
         bundles[0].setProcessConcurrency(2);
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
-        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+        String inputName = bundles[0].getProcessObject().getFirstInputName();
         bundles[0].setProcessLatePolicy(getLateData(2, "minutes", "periodic", inputName, aggregateWorkflowDir));
 
         bundles[0].submitAndScheduleProcess();
@@ -126,8 +125,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].setProcessConcurrency(2);
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
-        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+        String inputName = bundles[0].getProcessObject().getFirstInputName();
 
         bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
         bundles[0].submitAndScheduleProcess();
@@ -166,8 +164,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
         bundles[0].setProcessValidity(startTime, endTime);
         bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
         bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
-        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
+        String inputName = bundles[0].getProcessObject().getFirstInputName();
 
         bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
         bundles[0].setProcessConcurrency(2);
@@ -216,17 +213,17 @@ public class ProcessLateRerunTest extends BaseTestClass {
         // Increase the window of input for process
         bundles[0].setDatasetInstances(startInstance, endInstance);
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
-        String inputName = processMerlin.getInputs().getInputs().get(0).getName();
-        Input tempFeed = processMerlin.getInputs().getInputs().get(0);
+        ProcessMerlin process = bundles[0].getProcessObject();
+        String inputName = process.getFirstInputName();
+        Input tempFeed = process.getInputs().getInputs().get(0);
 
         Input gateInput = new Input();
         gateInput.setName("Gate");
         gateInput.setFeed(tempFeed.getFeed());
         gateInput.setEnd("now(0,1)");
         gateInput.setStart("now(0,1)");
-        processMerlin.getInputs().getInputs().add(gateInput);
-        bundles[0].setProcessData(processMerlin.toString());
+        process.getInputs().getInputs().add(gateInput);
+        bundles[0].setProcessData(process.toString());
 
         bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
index cd7eba4..c73140d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
@@ -80,7 +80,7 @@ public class ProcessSLATest extends BaseTestClass {
     @Test
     public void scheduleValidProcessSLA() throws Exception {
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
         processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
                 new Frequency("6", Frequency.TimeUnit.hours));
         bundles[0].setProcessData(processMerlin.toString());
@@ -95,7 +95,7 @@ public class ProcessSLATest extends BaseTestClass {
     @Test
     public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception {
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
         processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
                 new Frequency("3", Frequency.TimeUnit.hours));
         bundles[0].setProcessData(processMerlin.toString());
@@ -110,7 +110,7 @@ public class ProcessSLATest extends BaseTestClass {
     @Test
     public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception {
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
         processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours),
                 new Frequency("2", Frequency.TimeUnit.hours));
         bundles[0].setProcessData(processMerlin.toString());
@@ -131,7 +131,7 @@ public class ProcessSLATest extends BaseTestClass {
     @Test
     public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception {
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
         processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours));
         processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
                 new Frequency("4", Frequency.TimeUnit.hours));
@@ -147,7 +147,7 @@ public class ProcessSLATest extends BaseTestClass {
     @Test
     public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception {
 
-        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin processMerlin = bundles[0].getProcessObject();
         processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours));
         processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
                 new Frequency("4", Frequency.TimeUnit.hours));

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
index 9886d76..ca612b8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java
@@ -236,7 +236,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass {
         prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
         prism.getFeedHelper().submitEntity(feed);
         prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle());
-        ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin processObj = bundles[0].getProcessObject();
         processObj.setWorkflow(null);
         ServiceResponse response = prism.getProcessHelper().validateEntity(processObj.toString());
         AssertUtil.assertFailed(response);
@@ -253,7 +253,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass {
         prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
         prism.getFeedHelper().submitEntity(feed);
         prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle());
-        ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin processObj = bundles[0].getProcessObject();
         processObj.setWorkflow(null);
         ServiceResponse response = cluster.getProcessHelper().validateEntity(processObj.toString());
         AssertUtil.assertFailed(response);

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
index f9fcf8d..f58b0f6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java
@@ -67,20 +67,20 @@ public class EntitiesPatternSearchTest extends BaseTestClass {
 
         //submit different clusters, feeds and processes
         FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin process = bundles[0].getProcessObject();
         ClusterMerlin cluster = bundles[0].getClusterElement();
         String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-';
         String processNamePrefix = bundles[0].getProcessName() + '-';
         String feedNamePrefix = bundles[0].getInputFeedNameFromBundle() + '-';
-        List randomeNames = getPatternName();
-        for (int i = 0; i < randomeNames.size(); i++) {
-            process.setName(processNamePrefix + randomeNames.get(i));
+        List randomNames = getPatternName();
+        for (Object randomName : randomNames) {
+            process.setName(processNamePrefix + randomName);
             AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString()));
 
-            feed.setName(feedNamePrefix + randomeNames.get(i));
+            feed.setName(feedNamePrefix + randomName);
             AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
 
-            cluster.setName(clusterNamePrefix + randomeNames.get(i));
+            cluster.setName(clusterNamePrefix + randomName);
             AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster.toString()));
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
index 13b3b88..3ae44e6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java
@@ -82,7 +82,7 @@ public class ListEntitiesTest extends BaseTestClass {
 
         //submit 10 different clusters, feeds and processes
         FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin process = bundles[0].getProcessObject();
         ClusterMerlin cluster = bundles[0].getClusterElement();
         String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-';
         String processNamePrefix = bundles[0].getProcessName() + '-';

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
index 202298e..7d05d6b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
@@ -253,8 +253,7 @@ public class HCatProcessTest extends BaseTestClass {
         feedObj.setName(inputFeed2Name);
         feedObj.getTable().setUri(inputTableUri2);
 
-        String inputFeed2 = feedObj.toString();
-        bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+        bundles[0].addInputFeedToBundle("inputData2", feedObj);
 
         String outputTableUri =
             "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
@@ -347,7 +346,7 @@ public class HCatProcessTest extends BaseTestClass {
         FeedMerlin feedObj = new FeedMerlin(outputFeed1);
         feedObj.setName(outputFeed2Name);
         feedObj.getTable().setUri(outputTableUri2);
-        bundles[0].addOutputFeedToBundle("outputData2", feedObj.toString(), 0);
+        bundles[0].addOutputFeedToBundle("outputData2", feedObj);
 
         bundles[0].setProcessValidity(startDate, endDate);
         bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
@@ -433,8 +432,7 @@ public class HCatProcessTest extends BaseTestClass {
         FeedMerlin feedObj = new FeedMerlin(inputFeed1);
         feedObj.setName(inputFeed2Name);
         feedObj.getTable().setUri(inputTableUri2);
-        String inputFeed2 = feedObj.toString();
-        bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+        bundles[0].addInputFeedToBundle("inputData2", feedObj);
 
         String outputTableUri =
             "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
@@ -448,8 +446,7 @@ public class HCatProcessTest extends BaseTestClass {
         FeedMerlin feedObj2 = new FeedMerlin(outputFeed1);
         feedObj2.setName(outputFeed2Name);
         feedObj2.getTable().setUri(outputTableUri2);
-        String outputFeed2 = feedObj2.toString();
-        bundles[0].addOutputFeedToBundle("outputData2", outputFeed2, 0);
+        bundles[0].addOutputFeedToBundle("outputData2", feedObj2);
         bundles[0].setProcessValidity(startDate, endDate);
         bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
         bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index 4466c13..3f6dc66 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -130,8 +130,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String oldBundleId = InstanceUtil
-                .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                .getLatestBundleID(cluster3, bundles[1].getProcessName(), EntityType.PROCESS);
 
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -139,16 +138,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
                 EntityType.PROCESS);
 
-        String updatedProcess = InstanceUtil
-                .setProcessFrequency(bundles[1].getProcessData(),
-                        new Frequency("" + 5, TimeUnit.minutes));
+        ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+        updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes));
 
-        LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess));
+        LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess.toString()));
 
         //now to update
         while (Util
                 .parseResponse(prism.getProcessHelper()
-                        .update((bundles[1].getProcessData()), updatedProcess))
+                        .update((bundles[1].getProcessData()), updatedProcess.toString()))
                 .getStatus() != APIResult.Status.SUCCEEDED) {
             LOGGER.info("update didn't SUCCEED in last attempt");
             TimeUtil.sleepSeconds(10);
@@ -186,7 +184,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         List<String> oldNominalTimes =
                 OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
@@ -256,7 +254,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         TimeUtil.sleepSeconds(25);
 
         int initialConcurrency = bundles[1].getProcessObject().getParallel();
@@ -271,14 +269,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         //now to update
         AssertUtil.assertPartial(prism.getProcessHelper()
             .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
-        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
-        Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency);
-        Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), workflowPath);
-        Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder());
+        ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+        Assert.assertEquals(process.getParallel(), initialConcurrency);
+        Assert.assertEquals(process.getWorkflow().getPath(), workflowPath);
+        Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
 
         String coloString = getResponse(cluster2, bundles[1].getProcessData(), true);
-        Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(),
-                workflowPath2);
+        Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(), workflowPath2);
 
         Util.startService(cluster3.getProcessHelper());
         dualComparisonFailure(prism, cluster2, bundles[1].getProcessData());
@@ -296,13 +293,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
             LOGGER.info("WARNING: update did not succeed, retrying ");
             TimeUtil.sleepSeconds(20);
         }
-        prismString = getResponse(prism, bundles[1].getProcessData(), true);
-        Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
-                initialConcurrency + 3);
-        Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
-                workflowPath2);
-        Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
-                bundles[1].getProcessObject().getOrder());
+        process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+        Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+        Assert.assertEquals(process.getWorkflow().getPath(), workflowPath2);
+        Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         AssertUtil
                 .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -336,7 +330,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -344,16 +338,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 EntityType.PROCESS);
         LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
 
-        String updatedProcess = InstanceUtil
-                .setProcessFrequency(bundles[1].getProcessData(),
-                        new Frequency("" + 7, TimeUnit.minutes));
+        ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+        updatedProcess.setFrequency(new Frequency("7", TimeUnit.minutes));
 
         LOGGER.info("updated process: " + updatedProcess);
 
         //now to update
-
         ServiceResponse response =
-                prism.getProcessHelper().update(updatedProcess, updatedProcess);
+            prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
         AssertUtil.assertSucceeded(response);
         OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
                 bundles[1].getProcessData(), true, false);
@@ -361,7 +353,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String prismString = getResponse(prism, bundles[1].getProcessData(), true);
         Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(),
-                new ProcessMerlin(updatedProcess).getFrequency());
+                updatedProcess.getFrequency());
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
@@ -376,7 +368,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         String originalProcessData = bundles[1].getProcessData();
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -410,7 +402,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -443,7 +435,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         // future : should be verified using cord xml
         Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
-                Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                bundles[1].getProcessName(), EntityType.PROCESS);
 
         boolean doesExist = false;
         OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -452,7 +444,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 status != Job.Status.DONEWITHERROR) {
             int statusCount = InstanceUtil
                     .getInstanceCountWithStatus(cluster3,
-                            Util.readEntityName(bundles[1].getProcessData()),
+                            bundles[1].getProcessName(),
                             org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
                             EntityType.PROCESS);
             if (statusCount == bundles[1].getProcessObject().getParallel() + 3) {
@@ -460,7 +452,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 break;
             }
             status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
-                    Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                    bundles[1].getProcessName(), EntityType.PROCESS);
             Assert.assertNotNull(status,
                     "status must not be null!");
             TimeUtil.sleepSeconds(30);
@@ -494,7 +486,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
                 EntityType.PROCESS);
@@ -577,7 +569,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         AssertUtil.assertSucceeded(
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10);
@@ -610,7 +602,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
 
         Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
-                Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                bundles[1].getProcessName(), EntityType.PROCESS);
 
         boolean doesExist = false;
         OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -619,7 +611,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 status != Job.Status.DONEWITHERROR) {
             if (InstanceUtil
                     .getInstanceCountWithStatus(cluster3,
-                            Util.readEntityName(bundles[1].getProcessData()),
+                            bundles[1].getProcessName(),
                             org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
                             EntityType.PROCESS)
                     ==
@@ -628,7 +620,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 break;
             }
             status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
-                    Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                    bundles[1].getProcessName(), EntityType.PROCESS);
         }
 
         Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
@@ -670,7 +662,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
                 EntityType.PROCESS);
@@ -707,7 +699,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         Job.Status status =
                 OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         boolean doesExist = false;
         OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
@@ -716,7 +708,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 status != Job.Status.DONEWITHERROR) {
             if (InstanceUtil
                     .getInstanceCountWithStatus(cluster3,
-                            Util.readEntityName(bundles[1].getProcessData()),
+                            bundles[1].getProcessName(),
                             org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
                             EntityType.PROCESS)
                     ==
@@ -725,13 +717,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 break;
             }
             status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
-                    Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                    bundles[1].getProcessName(), EntityType.PROCESS);
             TimeUtil.sleepSeconds(30);
         }
         Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
         OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
                         .getLatestBundleID(cluster3,
-                                Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
+                                bundles[1].getProcessName(), EntityType.PROCESS),
                 oldNominalTimes, bundles[1].getProcessData(), false,
                 true
         );
@@ -771,7 +763,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
         List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
                 EntityType.PROCESS);
@@ -793,13 +785,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                         .getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) {
             TimeUtil.sleepSeconds(10);
         }
-        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
-        Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
-                initialConcurrency + 3);
-        Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
-                aggregator1Path);
-        Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
-                bundles[1].getProcessObject().getOrder());
+        ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+        Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+        Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path);
+        Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
@@ -839,7 +828,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
 
@@ -867,13 +856,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
 
-        String prismString = getResponse(prism, bundles[1].getProcessData(), true);
-        Assert.assertEquals(new ProcessMerlin(prismString).getParallel(),
-                initialConcurrency + 3);
-        Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(),
-                aggregator1Path);
-        Assert.assertEquals(new ProcessMerlin(prismString).getOrder(),
-                bundles[1].getProcessObject().getOrder());
+        ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true));
+        Assert.assertEquals(process.getParallel(), initialConcurrency + 3);
+        Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path);
+        Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder());
         dualComparison(prism, cluster3, bundles[1].getProcessData());
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
@@ -913,7 +899,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         TimeUtil.sleepSeconds(20);
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -965,7 +951,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
         List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
@@ -1090,7 +1076,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
 
@@ -1153,7 +1139,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         TimeUtil.sleepSeconds(30);
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
@@ -1220,7 +1206,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
         List<String> oldNominalTimes =
@@ -1228,15 +1214,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
 
-        String updatedProcess = InstanceUtil
-                .setProcessFrequency(bundles[1].getProcessData(),
-                        new Frequency("" + 5, TimeUnit.minutes));
+        ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+        updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes));
 
         LOGGER.info("updated process: " + updatedProcess);
 
         //now to update
         ServiceResponse response =
-                prism.getProcessHelper().update(updatedProcess, updatedProcess);
+            prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
         AssertUtil.assertSucceeded(response);
         InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10);
 
@@ -1272,7 +1257,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
         List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
@@ -1280,18 +1265,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
 
-        String updatedProcess = InstanceUtil
-                .setProcessFrequency(bundles[1].getProcessData(),
-                        new Frequency("" + 1, TimeUnit.months));
-        updatedProcess = InstanceUtil
-                .setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10),
-                        endTime);
+        ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject());
+        updatedProcess.setFrequency(new Frequency("1", TimeUnit.months));
+        updatedProcess.setValidity(TimeUtil.getTimeWrtSystemTime(10), endTime);
 
         LOGGER.info("updated process: " + updatedProcess);
 
         //now to update
         ServiceResponse response =
-                prism.getProcessHelper().update(updatedProcess, updatedProcess);
+            prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString());
         AssertUtil.assertSucceeded(response);
         String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData());
         Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(),
@@ -1316,7 +1298,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
 
         List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
                 EntityType.PROCESS);
@@ -1351,7 +1333,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         TimeUtil.sleepSeconds(30);
 
         OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
@@ -1393,7 +1375,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                                 .getValidity().getEnd()));
         Assert.assertEquals(InstanceUtil
                 .getProcessInstanceList(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS)
+                        bundles[1].getProcessName(), EntityType.PROCESS)
                 .size(), getExpectedNumberOfWorkflowInstances(newStartTime,
                 bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
 
@@ -1409,7 +1391,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
         String oldBundleId = InstanceUtil
                 .getLatestBundleID(cluster3,
-                        Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+                        bundles[1].getProcessName(), EntityType.PROCESS);
         TimeUtil.sleepSeconds(30);
 
         String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
@@ -1476,8 +1458,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
             //update workflow.xml
             hadoopFileEditor = new HadoopFileEditor(cluster1FS);
-            hadoopFileEditor.edit(new ProcessMerlin(b
-                            .getProcessData()).getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
+            hadoopFileEditor.edit(b.getProcessObject().getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
                     "<!-- some comment -->");
 
             //update
@@ -1553,7 +1534,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         throws Exception {
 
         while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
-                Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) {
+                bundle.getProcessName(), EntityType.PROCESS) != state) {
             //keep waiting
             TimeUtil.sleepSeconds(10);
         }
@@ -1566,7 +1547,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         while (coord.getStatus() != state) {
             TimeUtil.sleepSeconds(10);
             coord = getDefaultOozieCoord(coloHelper, InstanceUtil
-                    .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
+                    .getLatestBundleID(coloHelper, bundle.getProcessName(),
                             EntityType.PROCESS));
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
index c9e373e..39f0268 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
@@ -93,7 +93,7 @@ public class OptionalInputTest extends BaseTestClass {
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin process = bundles[0].getProcessObject();
         LOGGER.info(Util.prettyPrintXml(process.toString()));
 
         bundles[0].submitAndScheduleBundle(prism, false);
@@ -125,7 +125,7 @@ public class OptionalInputTest extends BaseTestClass {
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
-        String processName = Util.readEntityName(bundles[0].getProcessData());
+        String processName = bundles[0].getProcessName();
         LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
         bundles[0].submitAndScheduleBundle(prism, false);
 
@@ -163,7 +163,7 @@ public class OptionalInputTest extends BaseTestClass {
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
-        String processName = Util.readEntityName(bundles[0].getProcessData());
+        String processName = bundles[0].getProcessName();
         LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
 
         bundles[0].submitAndScheduleBundle(prism, false);
@@ -232,11 +232,11 @@ public class OptionalInputTest extends BaseTestClass {
             LOGGER.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
         }
 
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
-        LOGGER.info(Util.prettyPrintXml(process.toString()));
+
+        LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
 
         bundles[0].submitAndScheduleBundle(prism, false);
-        InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, bundles[0].getProcessName(),
                 2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
     }
 
@@ -262,10 +262,8 @@ public class OptionalInputTest extends BaseTestClass {
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
-        LOGGER.info(Util.prettyPrintXml(process.toString()));
-        String processName = process.getName();
-        LOGGER.info(Util.prettyPrintXml(process.toString()));
+        LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+        String processName = bundles[0].getProcessName();
 
         bundles[0].submitAndScheduleBundle(prism, true);
         InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
@@ -278,11 +276,9 @@ public class OptionalInputTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
             1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
 
-        final ProcessMerlin processMerlin = new ProcessMerlin(process);
-        processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
-        bundles[0].setProcessData(processMerlin.toString());
-        bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
-        process = new ProcessMerlin(bundles[0].getProcessData());
+        ProcessMerlin process = bundles[0].getProcessObject();
+        process.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
+        process.setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         LOGGER.info("modified process:" + Util.prettyPrintXml(process.toString()));
 
         prism.getProcessHelper().update(process.toString(), process.toString());
@@ -319,25 +315,22 @@ public class OptionalInputTest extends BaseTestClass {
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(4);
-        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
-        String processName = process.getName();
+        ProcessMerlin process = bundles[0].getProcessObject();
         LOGGER.info(Util.prettyPrintXml(process.toString()));
 
         bundles[0].submitAndScheduleBundle(prism, true);
-        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
+        InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
                 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
 
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
             TimeUtil.addMinsToTime(startTime, -10), TimeUtil.addMinsToTime(endTime, 10), 5);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
-            inputPath + "/input1/", dataDates);
-        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
-            1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+                inputPath + "/input1/", dataDates);
+        InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
+                1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
 
-        final ProcessMerlin processMerlin = new ProcessMerlin(process);
-        processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
-        bundles[0].setProcessData(processMerlin.toString());
-        process = new ProcessMerlin(bundles[0].getProcessData());
+        process.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
+        bundles[0].setProcessData(process.toString());
 
         //delete all input data
         HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS);
@@ -347,7 +340,7 @@ public class OptionalInputTest extends BaseTestClass {
         prism.getProcessHelper().update(process.toString(), process.toString());
 
         //from now on ... it should wait of input0 also
-        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
+        InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(),
                 2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
     }
 }