You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:17 UTC

[12/41] git commit: FALCON-646 Refactoring, documentation stuff. Contributed by Paul Isaychuk

FALCON-646 Refactoring, documentation stuff. Contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9774414a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9774414a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9774414a

Branch: refs/heads/FALCON-585
Commit: 9774414a6e654edf6071d70d630bcbb0a960b420
Parents: 23eed9f
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Fri Aug 29 13:12:19 2014 +0300
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Fri Aug 29 13:12:19 2014 +0300

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   6 +-
 .../falcon/regression/core/util/Util.java       |  82 ++++++-----
 .../regression/FeedSubmitAndScheduleTest.java   |  32 ++---
 .../regression/prism/FeedRetentionTest.java     |  39 ++---
 .../regression/prism/OptionalInputTest.java     | 143 ++++++-------------
 5 files changed, 123 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index ad6ef79..f5cfb93 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -9,8 +9,10 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
-   
-   FALCON-572 HadoopUtil cleanup in falcon-regression (Ruslan Ostafiychuk via Samarth Gupta)  
+
+   FALCON-646 Refactoring, documentation stuff (Paul Isaychuk via Ruslan Ostafiychuk)
+
+   FALCON-572 HadoopUtil cleanup in falcon-regression (Ruslan Ostafiychuk via Samarth Gupta)
    FALCON-632 Refactoring, documentation stuff (Paul Isaychuk via Samarth Gupta)
 
    FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
index 3fc9388..6485784 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
@@ -119,12 +119,10 @@ public final class Util {
     }
 
     public static APIResult parseResponse(ServiceResponse response) throws JAXBException {
-
         if (!isXML(response.getMessage())) {
             return new APIResult(APIResult.Status.FAILED, response.getMessage(), "somerandomstring",
                 response.getCode());
         }
-
         JAXBContext jc = JAXBContext.newInstance(APIResult.class);
         Unmarshaller u = jc.createUnmarshaller();
         APIResult temp;
@@ -143,7 +141,6 @@ public final class Util {
                 temp.setStatus(APIResult.Status.FAILED);
             }
         }
-
         return temp;
     }
 
@@ -170,30 +167,25 @@ public final class Util {
     }
 
     public static String getUniqueString() {
-
         return "-" + UUID.randomUUID().toString().split("-")[0];
     }
 
     public static List<String> getHadoopDataFromDir(FileSystem fs, String feed, String dir)
         throws IOException {
         List<String> finalResult = new ArrayList<String>();
-
         String feedPath = getFeedPath(feed);
         int depth = feedPath.split(dir)[1].split("/").length - 1;
         List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs,
             new Path(dir), depth);
-
         for (Path result : results) {
             int pathDepth = result.toString().split(dir)[1].split("/").length - 1;
             if (pathDepth == depth) {
                 finalResult.add(result.toString().split(dir)[1]);
             }
         }
-
         return finalResult;
     }
 
-
     public static String setFeedProperty(String feed, String propertyName, String propertyValue) {
         FeedMerlin feedObject = new FeedMerlin(feed);
         boolean found = false;
@@ -205,20 +197,15 @@ public final class Util {
                 break;
             }
         }
-
         if (!found) {
             Property property = new Property();
             property.setName(propertyName);
             property.setValue(propertyValue);
             feedObject.getProperties().getProperties().add(property);
         }
-
-
         return feedObject.toString();
-
     }
 
-
     public static String getFeedPath(String feed) {
         FeedMerlin feedObject = new FeedMerlin(feed);
         for (Location location : feedObject.getLocations().getLocations()) {
@@ -236,7 +223,6 @@ public final class Util {
         return feedObject.toString();
     }
 
-
     public static String setFeedPathValue(String feed, String pathValue) {
         FeedMerlin feedObject = new FeedMerlin(feed);
         for (Location location : feedObject.getLocations().getLocations()) {
@@ -247,11 +233,9 @@ public final class Util {
         return feedObject.toString();
     }
 
-
     public static String findFolderBetweenGivenTimeStamps(DateTime startTime, DateTime endTime,
                                                           List<String> folderList) {
         DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm");
-
         for (String folder : folderList) {
             if (folder.compareTo(formatter.print(startTime)) >= 0
                     &&
@@ -285,8 +269,7 @@ public final class Util {
                 .getQaHost(), coloHelper.getProcessHelper().getUsername(),
             coloHelper.getProcessHelper().getPassword(),
             "cat /var/log/ivory/application.* | grep \"" + workflowId + "\" | grep "
-                    +
-                "\"Received\" | awk '{print $2}'",
+                    + "\"Received\" | awk '{print $2}'",
             coloHelper.getProcessHelper().getUsername(),
             coloHelper.getProcessHelper().getIdentityFile()
         );
@@ -313,7 +296,6 @@ public final class Util {
         for (String line : raw) {
             finalList.add(line.split(",")[0]);
         }
-
         return finalList;
     }
 
@@ -327,7 +309,6 @@ public final class Util {
 
     public static void startService(IEntityManagerHelper helper)
         throws IOException, JSchException, AuthenticationException, URISyntaxException {
-
         ExecUtil.runRemoteScriptAsSudo(helper.getQaHost(), helper.getUsername(),
             helper.getPassword(), helper.getServiceStartCmd(), helper.getServiceUser(),
             helper.getIdentityFile());
@@ -372,17 +353,13 @@ public final class Util {
         }
     }
 
-
     public static String getEnvClusterXML(String cluster, String prefix) {
-
-        ClusterMerlin clusterObject =
-            getClusterObject(cluster);
+        ClusterMerlin clusterObject = getClusterObject(cluster);
         if ((null == prefix) || prefix.isEmpty()) {
             prefix = "";
         } else {
             prefix = prefix + ".";
         }
-
         String hcatEndpoint = Config.getProperty(prefix + "hcat_endpoint");
 
         //now read and set relevant values
@@ -401,7 +378,6 @@ public final class Util {
                 iface.setEndpoint(hcatEndpoint);
             }
         }
-
         //set colo name:
         clusterObject.setColo(Config.getProperty(prefix + "colo"));
         // properties in the cluster needed when secure mode is on
@@ -454,6 +430,13 @@ public final class Util {
         return null;
     }
 
+    /**
+     * Compares two definitions
+     * @param server1 server where 1st definition is stored
+     * @param server2 server where 2nd definition is stored
+     * @param entity entity which is under analysis
+     * @return are definitions identical
+     */
     public static boolean isDefinitionSame(ColoHelper server1, ColoHelper server2,
                                            String entity)
         throws URISyntaxException, IOException, AuthenticationException, JAXBException,
@@ -463,10 +446,9 @@ public final class Util {
     }
 
     /**
-     * emuns used for instance api.
+     * enums used for instance api.
      */
     public enum URLS {
-
         LIST_URL("/api/entities/list"),
         SUBMIT_URL("/api/entities/submit"),
         GET_ENTITY_DEFINITION("/api/entities/definition"),
@@ -497,17 +479,27 @@ public final class Util {
         }
     }
 
-
+    /**
+     * @param pathString whole path
+     * @return path to basic data folder
+     */
     public static String getPathPrefix(String pathString) {
         return pathString.substring(0, pathString.indexOf('$'));
     }
 
+    /**
+     * @param path whole path
+     * @return file name which is retrieved from a path
+     */
     public static String getFileNameFromPath(String path) {
-
         return path.substring(path.lastIndexOf('/') + 1, path.length());
     }
 
-
+    /**
+     * Defines request type according to request url
+     * @param url request url
+     * @return request type
+     */
     public static String getMethodType(String url) {
         List<String> postList = new ArrayList<String>();
         postList.add("/entities/validate");
@@ -531,10 +523,14 @@ public final class Util {
                 return "delete";
             }
         }
-
         return "get";
     }
 
+    /**
+     * Prints xml in readable form
+     * @param xmlString xmlString
+     * @return formatted xmlString
+     */
     public static String prettyPrintXml(final String xmlString) {
         if (xmlString == null) {
             return null;
@@ -554,19 +550,27 @@ public final class Util {
         } catch (TransformerException e) {
             return xmlString;
         }
-
     }
 
+    /**
+     * Converts json string to readable form
+     * @param jsonString json string
+     * @return formatted string
+     */
     public static String prettyPrintJson(final String jsonString) {
         if (jsonString == null) {
             return null;
         }
         Gson gson = new GsonBuilder().setPrettyPrinting().create();
         JsonElement json = new JsonParser().parse(jsonString);
-
         return gson.toJson(json);
     }
 
+    /**
+     * Prints xml or json in pretty and readable format
+     * @param str xml or json string
+     * @return converted xml or json
+     */
     public static String prettyPrintXmlOrJson(final String str) {
         if (str == null) {
             return null;
@@ -583,6 +587,13 @@ public final class Util {
         return str;
     }
 
+    /**
+     * Tries to get entity definition.
+     * @param cluster cluster where definition is stored
+     * @param entity entity for which definition is required
+     * @param shouldReturn should the definition be successfully retrieved or not
+     * @return entity definition
+     */
     public static String getEntityDefinition(ColoHelper cluster,
                                              String entity,
                                              boolean shouldReturn) throws
@@ -597,10 +608,8 @@ public final class Util {
         } else {
             helper = cluster.getClusterHelper();
         }
-
         ServiceResponse response = helper.getEntityDefinition(URLS
             .GET_ENTITY_DEFINITION, entity);
-
         if (shouldReturn) {
             AssertUtil.assertSucceeded(response);
         } else {
@@ -608,7 +617,6 @@ public final class Util {
         }
         String result = response.getMessage();
         Assert.assertNotNull(result);
-
         return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
index ab13dd4..38cf080 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
@@ -54,6 +54,7 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
     private OozieClient clusterOC = serverOC.get(0);
     private String aggregateWorkflowDir = baseHDFSDir + "/FeedSubmitAndScheduleTest/aggregator";
     private static final Logger LOGGER = Logger.getLogger(FeedSubmitAndScheduleTest.class);
+    private String feed;
 
     @BeforeMethod(alwaysRun = true)
     public void uploadWorkflow() throws Exception {
@@ -67,6 +68,7 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
         bundles[0] = new Bundle(bundles[0], cluster);
         bundles[0].generateUniqueBundle();
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        feed = bundles[0].getDataSets().get(0);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -91,8 +93,8 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
         throws JAXBException, IOException, URISyntaxException, AuthenticationException {
         Assert.assertEquals(Util.parseResponse(prism.getClusterHelper()
             .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0))).getStatusCode(), 200);
-        ServiceResponse response = prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(URLS
+            .SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertSucceeded(response);
     }
 
@@ -109,17 +111,16 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
 
         //get created bundle id
         String bundleId = InstanceUtil
-            .getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)),
-                EntityType.FEED);
+            .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
 
         //try to submit and schedule the same process again
         ServiceResponse response = prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertSucceeded(response);
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
 
         //check that new bundle wasn't created
-        OozieUtil.verifyNewBundleCreation(cluster, bundleId, null, bundles[0].getDataSets().get(0), false, false);
+        OozieUtil.verifyNewBundleCreation(cluster, bundleId, null, feed, false, false);
     }
 
     /**
@@ -131,7 +132,7 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
     @Test(groups = {"singleCluster"})
     public void snsFeedWithoutCluster() throws Exception {
         ServiceResponse response = prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertFailed(response);
     }
 
@@ -145,14 +146,11 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
     public void snsDeletedFeed() throws Exception {
         submitFirstClusterScheduleFirstFeed();
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-        Assert.assertEquals(
-            Util.parseResponse(prism.getFeedHelper()
-                .delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)))
-                .getStatusCode(), 200);
+        Assert.assertEquals(Util.parseResponse(prism.getFeedHelper().delete(URLS.DELETE_URL,
+            feed)).getStatusCode(), 200);
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.KILLED);
-
         ServiceResponse response = prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertSucceeded(response);
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
     }
@@ -168,14 +166,10 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
         submitFirstClusterScheduleFirstFeed();
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
         Assert.assertEquals(Util.parseResponse(
-                prism.getFeedHelper()
-                    .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)))
-                .getStatusCode(),
-            200);
+            prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed)).getStatusCode(), 200);
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
         ServiceResponse response = prism.getFeedHelper()
-            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
-
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertSucceeded(response);
         AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
index e0571b5..f20877f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
@@ -98,6 +98,10 @@ public class FeedRetentionTest extends BaseTestClass {
         String inputData = inputPath + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
         String outputPathTemplate = baseHDFSDir +
             "/testOutput/op%d/ivoryRetention0%d/%s/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+        String cluster1Def = bundles[0].getClusters().get(0);
+        String cluster2Def = bundles[1].getClusters().get(0);
+        String cluster1Name = Util.readEntityName(cluster1Def);
+        String cluster2Name = Util.readEntityName(cluster2Def);
 
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
             TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(10), 1);
@@ -106,11 +110,10 @@ public class FeedRetentionTest extends BaseTestClass {
         HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.RESOURCES + "thriftRRMar0602.gz",
             inputPath, dataDates);
 
-        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
-        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster1Def);
+        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster2Def);
 
         String feedOutput01 = bundles[0].getFeed("FETL-RequestRC");
-
         feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
             XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
             XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
@@ -119,8 +122,7 @@ public class FeedRetentionTest extends BaseTestClass {
         feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
             XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
             XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
-            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
-            "${cluster.colo}",
+            cluster1Name, ClusterType.SOURCE, "${cluster.colo}",
             String.format(outputPathTemplate, 1, 1, "data"),
             String.format(outputPathTemplate, 1, 1, "stats"),
             String.format(outputPathTemplate, 1, 1, "meta"),
@@ -129,8 +131,7 @@ public class FeedRetentionTest extends BaseTestClass {
         feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
             XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
             XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
-            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
-            "${cluster.colo}",
+            cluster2Name, ClusterType.SOURCE,"${cluster.colo}",
             String.format(outputPathTemplate, 1, 2, "data"),
             String.format(outputPathTemplate, 1, 2, "stats"),
             String.format(outputPathTemplate, 1, 2, "meta"),
@@ -149,8 +150,7 @@ public class FeedRetentionTest extends BaseTestClass {
         feedOutput02 = InstanceUtil.setFeedCluster(feedOutput02,
             XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
             XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
-            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
-            "${cluster.colo}",
+            cluster1Name, ClusterType.SOURCE, "${cluster.colo}",
             String.format(outputPathTemplate, 2, 1, "data"),
             String.format(outputPathTemplate, 2, 1, "stats"),
             String.format(outputPathTemplate, 2, 1, "meta"),
@@ -159,8 +159,7 @@ public class FeedRetentionTest extends BaseTestClass {
         feedOutput02 = InstanceUtil.setFeedCluster(feedOutput02,
             XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
             XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
-            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
-            "${cluster.colo}",
+            cluster2Name, ClusterType.SOURCE, "${cluster.colo}",
             String.format(outputPathTemplate, 2, 2, "data"),
             String.format(outputPathTemplate, 2, 2, "stats"),
             String.format(outputPathTemplate, 2, 2, "meta"),
@@ -171,8 +170,7 @@ public class FeedRetentionTest extends BaseTestClass {
             prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOutput02));
 
         String feedInput = bundles[0].getFeed("FETL2-RRLog");
-        feedInput = InstanceUtil
-            .setFeedCluster(feedInput,
+        feedInput = InstanceUtil.setFeedCluster(feedInput,
                 XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
                 XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
                 ClusterType.SOURCE, null);
@@ -180,14 +178,12 @@ public class FeedRetentionTest extends BaseTestClass {
         feedInput = InstanceUtil.setFeedCluster(feedInput,
             XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
             XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
-            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
-            "${cluster.colo}", inputData);
+            cluster1Name, ClusterType.SOURCE, "${cluster.colo}", inputData);
 
         feedInput = InstanceUtil.setFeedCluster(feedInput,
             XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
             XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
-            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
-            "${cluster.colo}", inputData);
+            cluster2Name, ClusterType.SOURCE, "${cluster.colo}", inputData);
 
         AssertUtil.assertSucceeded(
             prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedInput));
@@ -196,22 +192,17 @@ public class FeedRetentionTest extends BaseTestClass {
         process = InstanceUtil.setProcessCluster(process, null,
             XmlUtil.createProcessValidity("2012-10-01T12:00Z", "2012-10-01T12:10Z"));
 
-        process = InstanceUtil.setProcessCluster(process,
-            Util.readEntityName(bundles[0].getClusters().get(0)),
+        process = InstanceUtil.setProcessCluster(process, cluster1Name,
             XmlUtil.createProcessValidity(TimeUtil.getTimeWrtSystemTime(-2),
                 TimeUtil.getTimeWrtSystemTime(5)));
-        process = InstanceUtil.setProcessCluster(process,
-            Util.readEntityName(bundles[1].getClusters().get(0)),
+        process = InstanceUtil.setProcessCluster(process, cluster2Name,
             XmlUtil.createProcessValidity(TimeUtil.getTimeWrtSystemTime(-2),
                 TimeUtil.getTimeWrtSystemTime(5)));
 
         logger.info("process: " + Util.prettyPrintXml(process));
-
         AssertUtil.assertSucceeded(
             prism.getProcessHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process));
-
         AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOutput01));
         AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOutput02));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
index 4b8fe0a..01c00a9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
@@ -81,28 +81,24 @@ public class OptionalInputTest extends BaseTestClass {
      */
     @Test(enabled = true, groups = {"singleCluster"})
     public void optionalTest_1optional_1compulsary() throws Exception {
-        bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, "2010-01-02T01:00Z",
-            "2010-01-02T01:12Z");
+        bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1,
+                "2010-01-02T01:00Z", "2010-01-02T01:12Z");
         for (int i = 0; i < bundles[0].getClusters().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
         for (int i = 0; i < bundles[0].getDataSets().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
-        logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+        String process = bundles[0].getProcessData();
+        logger.info(Util.prettyPrintXml(process));
 
         bundles[0].submitAndScheduleBundle(prism, false);
-
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z",
             "2010-01-02T01:10Z", 5);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
             inputPath + "/input1/", dataDates);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
                 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
     }
 
@@ -115,37 +111,30 @@ public class OptionalInputTest extends BaseTestClass {
      */
     @Test(enabled = true, groups = {"singleCluster"})
     public void optionalTest_1optional_2compulsary() throws Exception {
-        bundles[0].generateRequiredBundle(1, 3, 1, inputPath, 1, "2010-01-02T01:00Z",
-            "2010-01-02T01:12Z");
-
+        bundles[0].generateRequiredBundle(1, 3, 1, inputPath, 1,
+                "2010-01-02T01:00Z", "2010-01-02T01:12Z");
         for (int i = 0; i < bundles[0].getClusters().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
         for (int i = 0; i < bundles[0].getDataSets().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
+        String processName = Util.readEntityName(bundles[0].getProcessData());
         logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
-
         bundles[0].submitAndScheduleBundle(prism, false);
 
         logger.info("instanceShouldStillBeInWaitingState");
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
 
-        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z",
-            "2010-01-02T01:10Z", 5);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+                "2010-01-02T00:50Z", "2010-01-02T01:10Z", 5);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
             inputPath + "/input1/", dataDates);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
             inputPath + "/input2/", dataDates);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
     }
 
@@ -160,32 +149,24 @@ public class OptionalInputTest extends BaseTestClass {
     public void optionalTest_2optional_1compulsary() throws Exception {
         bundles[0].generateRequiredBundle(1, 3, 2, inputPath, 1, "2010-01-02T01:00Z",
             "2010-01-02T01:12Z");
-
         for (int i = 0; i < bundles[0].getClusters().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
         for (int i = 0; i < bundles[0].getDataSets().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
+        String processName = Util.readEntityName(bundles[0].getProcessData());
         logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
 
         bundles[0].submitAndScheduleBundle(prism, false);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
-
-        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z",
-            "2010-01-02T01:10Z", 5);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+                "2010-01-02T00:50Z", "2010-01-02T01:10Z", 5);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
             inputPath + "/input2/", dataDates);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
     }
 
@@ -201,16 +182,15 @@ public class OptionalInputTest extends BaseTestClass {
         String startTime = TimeUtil.getTimeWrtSystemTime(-4);
         String endTime = TimeUtil.getTimeWrtSystemTime(10);
         bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
-
         for (int i = 0; i < bundles[0].getClusters().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
         for (int i = 0; i < bundles[0].getDataSets().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
-        logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+        String process = bundles[0].getProcessData();
+        logger.info(Util.prettyPrintXml(process));
 
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
             TimeUtil.addMinsToTime(startTime, -10), endTime, 5);
@@ -220,12 +200,8 @@ public class OptionalInputTest extends BaseTestClass {
         for (String date : dataDates) {
             HadoopUtil.recreateDir(clusterFS, inputPath + "/input0/" + date);
         }
-
         bundles[0].submitFeedsScheduleProcess(prism);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
                 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
     }
 
@@ -237,24 +213,19 @@ public class OptionalInputTest extends BaseTestClass {
      */
     @Test(enabled = true, groups = {"singleCluster"})
     public void optionalTest_allInputOptional() throws Exception {
-        bundles[0].generateRequiredBundle(1, 2, 2, inputPath, 1, "2010-01-02T01:00Z",
-            "2010-01-02T01:12Z");
-
+        bundles[0].generateRequiredBundle(1, 2, 2, inputPath, 1,
+                "2010-01-02T01:00Z", "2010-01-02T01:12Z");
         bundles[0].setProcessInputNames("inputData");
-
         for (int i = 0; i < bundles[0].getClusters().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
         for (int i = 0; i < bundles[0].getDataSets().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
 
-        logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+        String process = bundles[0].getProcessData();
+        logger.info(Util.prettyPrintXml(process));
 
         bundles[0].submitAndScheduleBundle(prism, false);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
                 2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
     }
 
@@ -271,55 +242,43 @@ public class OptionalInputTest extends BaseTestClass {
         String startTime = TimeUtil.getTimeWrtSystemTime(-4);
         String endTime = TimeUtil.getTimeWrtSystemTime(30);
         bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
-
         for (int i = 0; i < bundles[0].getClusters().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
         for (int i = 0; i < bundles[0].getDataSets().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(2);
-        logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+        String process = bundles[0].getProcessData();
+        String processName = Util.getProcessName(process);
+        logger.info(Util.prettyPrintXml(process));
 
         bundles[0].submitAndScheduleBundle(prism, true);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
-
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
             TimeUtil.addMinsToTime(startTime, -10), endTime, 5);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
             inputPath + "/input1/", dataDates);
 
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
 
-        final ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        final ProcessMerlin processMerlin = new ProcessMerlin(process);
         processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
         bundles[0].setProcessData(processMerlin.toString());
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
-        logger.info("modified process:" + Util.prettyPrintXml(bundles[0].getProcessData()));
+        process = bundles[0].getProcessData();
+        logger.info("modified process:" + Util.prettyPrintXml(process));
 
-        prism.getProcessHelper().update(bundles[0].getProcessData(), bundles[0].getProcessData());
+        prism.getProcessHelper().update(process, process);
 
         //from now on ... it should wait of input0 also
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
-
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
             inputPath + "/input0/", dataDates);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
     }
 
@@ -336,52 +295,42 @@ public class OptionalInputTest extends BaseTestClass {
         String startTime = TimeUtil.getTimeWrtSystemTime(-4);
         String endTime = TimeUtil.getTimeWrtSystemTime(30);
         bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
-
         for (int i = 0; i < bundles[0].getClusters().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
         for (int i = 0; i < bundles[0].getDataSets().size(); i++)
             logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
 
         bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
         bundles[0].setProcessConcurrency(4);
-        logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+        String process = bundles[0].getProcessData();
+        String processName = Util.getProcessName(process);
+        logger.info(Util.prettyPrintXml(process));
 
         bundles[0].submitAndScheduleBundle(prism, true);
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
 
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
             TimeUtil.addMinsToTime(startTime, -10), TimeUtil.addMinsToTime(endTime, 10), 5);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
             inputPath + "/input1/", dataDates);
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
 
-        final ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        final ProcessMerlin processMerlin = new ProcessMerlin(process);
         processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
         bundles[0].setProcessData(processMerlin.toString());
+        process = bundles[0].getProcessData();
 
         //delete all input data
         HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS);
-
         bundles[0].setProcessInputNames("inputData0", "inputData");
+        logger.info("modified process:" + Util.prettyPrintXml(process));
 
-        logger.info("modified process:" + Util.prettyPrintXml(bundles[0].getProcessData()));
+        prism.getProcessHelper().update(process, process);
 
-        prism.getProcessHelper().update(bundles[0].getProcessData(), bundles[0].getProcessData());
-
-        logger.info("modified process:" + Util.prettyPrintXml(bundles[0].getProcessData()));
         //from now on ... it should wait of input0 also
-
-        InstanceUtil
-            .waitTillInstanceReachState(oozieClient,
-                Util.getProcessName(bundles[0].getProcessData()),
+        InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
                 2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
     }
 }