You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/28 08:07:58 UTC

[1/3] git commit: FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored contributed by Paul Isaychuk

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 1db8712d6 -> ffe18b0ce


FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/78b9c1a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/78b9c1a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/78b9c1a9

Branch: refs/heads/master
Commit: 78b9c1a91b698d78d881eb5555352ebc4b259e85
Parents: 1db8712
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Thu Aug 28 11:25:12 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Thu Aug 28 11:25:12 2014 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../falcon/regression/InstanceSummaryTest.java  |   4 +-
 .../prism/UpdateAtSpecificTimeTest.java         | 441 +++++++------------
 3 files changed, 167 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/78b9c1a9/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 1357808..5ed6a89 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -8,6 +8,9 @@ Trunk (Unreleased)
    FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G 
    via Samarth Gupta)
   IMPROVEMENTS
+   FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored
+   (Paul Isaychuk via Samarth Gupta)
+
    FALCON-619 ELExp_FutureAndLatestTest stabilization (Paul Isaychuk via Arpit Gupta)
 
    FALCON-610 Refactoring and documentation updates (Paul Isaychuk via Arpit Gupta)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/78b9c1a9/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
index c541620..9901fb1 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
@@ -53,6 +53,7 @@ import java.util.List;
 this test currently provide minimum verification. More detailed test should
  be added
  */
+@Test(groups = "embedded")
 public class InstanceSummaryTest extends BaseTestClass {
 
     //1. process : test summary single cluster few instance some future some past
@@ -304,8 +305,5 @@ public class InstanceSummaryTest extends BaseTestClass {
     public void tearDown() throws IOException {
         processBundle.deleteBundle(prism);
         removeBundles();
-        for (FileSystem fs : serverFS) {
-            HadoopUtil.deleteDirIfExists(Util.getPathPrefix(feedInputPath), fs);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/78b9c1a9/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index a017c79..8bfeda4 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -42,6 +42,7 @@ import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClientException;
 import org.custommonkey.xmlunit.Diff;
 import org.custommonkey.xmlunit.XMLUnit;
+import org.joda.time.DateTime;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -62,12 +63,10 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
     private static final Logger logger = Logger.getLogger(UpdateAtSpecificTimeTest.class);
 
     Bundle processBundle;
-
-    ColoHelper cluster_1 = servers.get(0);
-    ColoHelper cluster_2 = servers.get(1);
-    ColoHelper cluster_3 = servers.get(2);
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    ColoHelper cluster3 = servers.get(2);
     FileSystem cluster2FS = serverFS.get(1);
-
     private String dateTemplate = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
     private final String baseTestDir = baseHDFSDir + "/UpdateAtSpecificTimeTest-data";
     String aggregateWorkflowDir = baseHDFSDir + "/aggregator";
@@ -77,21 +76,20 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
     }
 
-
     @BeforeMethod(alwaysRun = true)
     public void setup(Method method) throws IOException {
         logger.info("test name: " + method.getName());
         Bundle bundle = BundleUtil.readLocalDCBundle();
-        bundles[0] = new Bundle(bundle, cluster_1);
-        bundles[1] = new Bundle(bundle, cluster_2);
-        bundles[2] = new Bundle(bundle, cluster_3);
+        bundles[0] = new Bundle(bundle, cluster1);
+        bundles[1] = new Bundle(bundle, cluster2);
+        bundles[2] = new Bundle(bundle, cluster3);
 
         bundles[0].generateUniqueBundle();
         bundles[1].generateUniqueBundle();
         bundles[2].generateUniqueBundle();
 
         processBundle = BundleUtil.readELBundle();
-        processBundle = new Bundle(processBundle, cluster_1);
+        processBundle = new Bundle(processBundle, cluster1);
         processBundle.generateUniqueBundle();
         processBundle.setProcessWorkflow(aggregateWorkflowDir);
     }
@@ -102,15 +100,14 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         removeBundles(processBundle);
     }
 
-    @Test(groups = {"singleCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
     public void invalidChar_Process()
         throws JAXBException, IOException, URISyntaxException,
         AuthenticationException, OozieClientException {
         processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0),
             TimeUtil.getTimeWrtSystemTime(20));
         processBundle.submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
         String oldProcess =
             processBundle.getProcessData();
         processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(5),
@@ -121,26 +118,23 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
             .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
     }
 
-    @Test(groups = {"singleCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
     public void invalidChar_Feed()
         throws JAXBException, IOException, URISyntaxException, AuthenticationException,
         OozieClientException {
 
         String feed = submitAndScheduleFeed(processBundle);
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
+
         //update frequency
         Frequency f = new Frequency("" + 21, Frequency.TimeUnit.minutes);
         String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
-
         ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed, "abc", null);
         Assert.assertTrue(r.getMessage()
             .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
     }
 
-
-    @Test(groups = {"singleCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
     public void updateTimeInPast_Process()
         throws JAXBException, IOException, URISyntaxException,
         OozieClientException, AuthenticationException {
@@ -151,49 +145,35 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
 
         //get old process details
         String oldProcess = processBundle.getProcessData();
+        String oldBundleId = InstanceUtil.getLatestBundleID(cluster1,
+            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
 
-        String oldBundleId = InstanceUtil
-            .getLatestBundleID(cluster_1,
-                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
-
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, oldProcess, 0);
-
-        List<String> initialNominalTimes = OozieUtil.getActionsNominalTime(cluster_1,
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 0);
+        List<String> initialNominalTimes = OozieUtil.getActionsNominalTime(cluster1,
             oldBundleId, EntityType.PROCESS);
 
-
         // update process by adding property
         processBundle.setProcessProperty("someProp", "someValue");
         ServiceResponse r = prism.getProcessHelper().update(oldProcess,
-            processBundle.getProcessData(),TimeUtil.getTimeWrtSystemTime(-10000), null);
+            processBundle.getProcessData(), TimeUtil.getTimeWrtSystemTime(-10000), null);
         AssertUtil.assertSucceeded(r);
 
         //check new coord created with current time
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleId, initialNominalTimes,
-            processBundle.getProcessData(), true,
-            false);
-
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, oldProcess, 1);
-
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleId, initialNominalTimes,
-            processBundle.getProcessData(), true,
-            true);
-
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes,
+            processBundle.getProcessData(), true, false);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 1);
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes,
+            processBundle.getProcessData(), true, true);
     }
 
-    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
-
+    @Test(groups = {"MultiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
     public void updateTimeInPast_Feed()
         throws JAXBException, IOException, OozieClientException,
         URISyntaxException, AuthenticationException {
 
-
         String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-10);
         String startTimeCluster_target = TimeUtil.getTimeWrtSystemTime(10);
-
         String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_target);
-
         logger.info("feed: " + Util.prettyPrintXml(feed));
 
         //submit and schedule feed
@@ -201,54 +181,47 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
             prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
         TimeUtil.sleepSeconds(10);
         AssertUtil.assertSucceeded(r);
-
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
 
         //update frequency
         Frequency f = new Frequency("" + 7, Frequency.TimeUnit.minutes);
         String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
-
         r = prism.getFeedHelper().update(feed, updatedFeed,
             TimeUtil.getTimeWrtSystemTime(-10000), null);
         AssertUtil.assertSucceeded(r);
-
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 1);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1);
 
         //check correct number of coord exists or not
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster_1.getFeedHelper(),
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
                 Util.readEntityName(feed),
                 "REPLICATION"), 2);
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster_2.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
                 "RETENTION"), 2);
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster_1.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
                 "RETENTION"), 2);
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster_3.getFeedHelper(), Util.readEntityName(feed),
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
                 "RETENTION"), 2);
-
     }
 
-
-    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"MultiCluster", "0.3.1", "distributed"}, timeOut = 1200000, enabled = true)
     public void inNextFewMinutesUpdate_RollForward_Process()
         throws JAXBException, IOException, URISyntaxException, JSchException,
         OozieClientException, SAXException, AuthenticationException {
-    /*
-    submit process on 3 clusters. Schedule on 2 clusters. Bring down one of
-    the scheduled cluster. Update with time 5 minutes from now. On running
-    cluster new coord should be created with start time +5 and no instance
-    should be missing. On 3rd cluster where process was only submit,
-    definition should be updated. Bring the down cluster up. Update with same
-     definition again, now the recently up cluster should also have new
-     coords.
-     */
-
+        /*
+        submit process on 3 clusters. Schedule on 2 clusters. Bring down one of
+        the scheduled cluster. Update with time 5 minutes from now. On running
+        cluster new coord should be created with start time +5 and no instance
+        should be missing. On 3rd cluster where process was only submit,
+        definition should be updated. Bring the down cluster up. Update with same
+        definition again, now the recently up cluster should also have new
+        coords.
+        */
         try {
-            Util.startService(cluster_2.getProcessHelper());
+            Util.startService(cluster2.getProcessHelper());
             String startTime = TimeUtil.getTimeWrtSystemTime(-15);
             processBundle.setProcessValidity(startTime,
                 TimeUtil.getTimeWrtSystemTime(60));
@@ -259,32 +232,28 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
             processBundle.submitBundle(prism);
 
             //schedule of 2 cluster
-            cluster_1.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL,
+            cluster1.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL,
                 processBundle.getProcessData());
-
-            cluster_2.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL,
+            cluster2.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL,
                 processBundle.getProcessData());
+            InstanceUtil.waitTillInstancesAreCreated(cluster2, processBundle.getProcessData(), 0);
 
-            InstanceUtil.waitTillInstancesAreCreated(cluster_2, processBundle.getProcessData(), 0);
-
-            //shut down cluster_2
-            Util.shutDownService(cluster_2.getProcessHelper());
+            //shut down cluster2
+            Util.shutDownService(cluster2.getProcessHelper());
 
             // save old data before update
             String oldProcess = processBundle.getProcessData();
             String oldBundleID_cluster1 = InstanceUtil
-                .getLatestBundleID(cluster_1,
+                .getLatestBundleID(cluster1,
                     Util.readEntityName(oldProcess), EntityType.PROCESS);
             String oldBundleID_cluster2 = InstanceUtil
-                .getLatestBundleID(cluster_2,
+                .getLatestBundleID(cluster2,
                     Util.readEntityName(oldProcess), EntityType.PROCESS);
-
             List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime
-                (cluster_1,
+                (cluster1,
                     oldBundleID_cluster1, EntityType.PROCESS);
-
             List<String> oldNominalTimes_cluster2 = OozieUtil.getActionsNominalTime
-                (cluster_2,
+                (cluster2,
                     oldBundleID_cluster2, EntityType.PROCESS);
 
             //update process validity
@@ -293,219 +262,172 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
             //send update request
             String updateTime = TimeUtil.getTimeWrtSystemTime(5);
             ServiceResponse r = prism.getProcessHelper()
-                .update(oldProcess, processBundle.getProcessData(), updateTime
-                );
+                .update(oldProcess, processBundle.getProcessData(), updateTime);
             AssertUtil.assertPartial(r);
+            InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1);
 
-            InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 1);
-
-            //verify new bundle on cluster_1 and definition on cluster_3
+            //verify new bundle on cluster1 and definition on cluster3
             OozieUtil
-                .verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, oldNominalTimes_cluster1,
+                .verifyNewBundleCreation(cluster1, oldBundleID_cluster1, oldNominalTimes_cluster1,
                     oldProcess, true, false);
-
-            OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
+            OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
                 oldNominalTimes_cluster2,
                 oldProcess, false, false);
-
-            String definition_cluster_3 = Util.getEntityDefinition(cluster_3,
+            String definition_cluster_3 = Util.getEntityDefinition(cluster3,
                 processBundle.getProcessData(), true);
-
             Assert.assertTrue(XmlUtil.isIdentical(definition_cluster_3,
                 processBundle.getProcessData()), "Process definitions should be equal");
 
-            //start the stopped cluster_2
-            Util.startService(cluster_2.getProcessHelper());
+            //start the stopped cluster2
+            Util.startService(cluster2.getProcessHelper());
             TimeUtil.sleepSeconds(40);
-
-            String newBundleID_cluster1 = InstanceUtil
-                .getLatestBundleID(cluster_1,
-                    Util.readEntityName(oldProcess), EntityType.PROCESS);
+            String newBundleID_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
+                Util.readEntityName(oldProcess), EntityType.PROCESS);
 
             //send second update request
-            r = prism.getProcessHelper().update(oldProcess,
-                processBundle.getProcessData(),
+            r = prism.getProcessHelper().update(oldProcess, processBundle.getProcessData(),
                 updateTime, null);
             AssertUtil.assertSucceeded(r);
-
-
-            String def_cluster_2 = Util.getEntityDefinition(cluster_2,
+            String def_cluster_2 = Util.getEntityDefinition(cluster2,
                 processBundle.getProcessData(), true);
             logger.info("def_cluster_2 : " + Util.prettyPrintXml(def_cluster_2));
 
-            // verify new bundle in cluster_2 and no new bundle in cluster_1  and
-            OozieUtil
-                .verifyNewBundleCreation(cluster_1, newBundleID_cluster1, oldNominalTimes_cluster1,
-                    oldProcess, false, false);
-
-            OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
-                oldNominalTimes_cluster2,
-                oldProcess, true, false);
+            // verify new bundle in cluster2 and no new bundle in cluster1  and
+            OozieUtil.verifyNewBundleCreation(cluster1, newBundleID_cluster1,
+                oldNominalTimes_cluster1, oldProcess, false, false);
+            OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
+                oldNominalTimes_cluster2, oldProcess, true, false);
 
             //wait till update time is reached
             TimeUtil.sleepTill(updateTime);
-
-            OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
-                oldNominalTimes_cluster2,
-                oldProcess, true, true);
-
-            OozieUtil
-                .verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, oldNominalTimes_cluster1,
-                    oldProcess, true, true);
+            OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
+                oldNominalTimes_cluster2, oldProcess, true, true);
+            OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1,
+                oldNominalTimes_cluster1, oldProcess, true, true);
         } finally {
-            Util.restartService(cluster_2.getProcessHelper());
+            Util.restartService(cluster2.getProcessHelper());
         }
     }
 
-    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"MultiCluster", "0.3.1", "distributed"}, timeOut = 1200000, enabled = true)
     public void inNextFewMinutesUpdate_RollForward_Feed()
         throws JAXBException, IOException, URISyntaxException, JSchException, 
         OozieClientException, SAXException, AuthenticationException {
         try {
             String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-18);
-
             String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_source);
-
             logger.info("feed: " + Util.prettyPrintXml(feed));
 
             //submit feed on all 3 clusters
             ServiceResponse r = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feed);
             AssertUtil.assertSucceeded(r);
 
-            //schedule feed of cluster_1 and cluster_2
-            r = cluster_1.getFeedHelper().schedule(Util.URLS.SCHEDULE_URL, feed);
+            //schedule feed of cluster1 and cluster2
+            r = cluster1.getFeedHelper().schedule(Util.URLS.SCHEDULE_URL, feed);
             AssertUtil.assertSucceeded(r);
-            r = cluster_2.getFeedHelper().schedule(Util.URLS.SCHEDULE_URL, feed);
+            r = cluster2.getFeedHelper().schedule(Util.URLS.SCHEDULE_URL, feed);
             AssertUtil.assertSucceeded(r);
+            InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
 
-            InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
-
-            //shutdown cluster_2
-            Util.shutDownService(cluster_2.getProcessHelper());
+            //shutdown cluster2
+            Util.shutDownService(cluster2.getProcessHelper());
 
             //add some property to feed so that new bundle is created
             String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal");
 
             //save old data
-            String oldBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster_1,
+            String oldBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
                 Util.readEntityName(feed), EntityType.FEED);
-
             List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime
-                (cluster_1,
-                    oldBundle_cluster1, EntityType.FEED);
+                (cluster1, oldBundle_cluster1, EntityType.FEED);
 
             //send update command with +5 mins in future
             String updateTime = TimeUtil.getTimeWrtSystemTime(5);
             r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
             AssertUtil.assertPartial(r);
 
-            //verify new bundle creation on cluster_1 and new definition on cluster_3
-            OozieUtil
-                .verifyNewBundleCreation(cluster_1, oldBundle_cluster1, oldNominalTimes_cluster1,
-                    feed, true, false);
-
-
-            String definition = Util.getEntityDefinition(cluster_3, feed, true);
+            //verify new bundle creation on cluster1 and new definition on cluster3
+            OozieUtil.verifyNewBundleCreation(cluster1, oldBundle_cluster1,
+                oldNominalTimes_cluster1, feed, true, false);
+            String definition = Util.getEntityDefinition(cluster3, feed, true);
             Diff diff = XMLUnit.compareXML(definition, processBundle.getProcessData());
             logger.info(diff);
 
-            //start stopped cluster_2
-            Util.startService(cluster_2.getProcessHelper());
-
-            String newBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster_1,
+            //start stopped cluster2
+            Util.startService(cluster2.getProcessHelper());
+            String newBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
                 Util.readEntityName(feed), EntityType.FEED);
 
             //send update again
             r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
             AssertUtil.assertSucceeded(r);
 
-            //verify new bundle creation on cluster_2 and no new bundle on cluster_1
+            //verify new bundle creation on cluster2 and no new bundle on cluster1
             Assert.assertEquals(InstanceUtil
-                .checkIfFeedCoordExist(cluster_2.getFeedHelper(), Util.readEntityName(feed),
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
                     "RETENTION"), 2);
+            OozieUtil.verifyNewBundleCreation(cluster1, newBundle_cluster1,
+                oldNominalTimes_cluster1, feed, false, false);
 
-            OozieUtil
-                .verifyNewBundleCreation(cluster_1, newBundle_cluster1, oldNominalTimes_cluster1,
-                    feed, false, false);
             //wait till update time is reached
             TimeUtil.sleepTill(TimeUtil.getTimeWrtSystemTime(5));
 
             //verify new bundle creation with instance matching
-            OozieUtil
-                .verifyNewBundleCreation(cluster_1, oldBundle_cluster1, oldNominalTimes_cluster1,
-                    feed, true, true);
-
+            OozieUtil.verifyNewBundleCreation(cluster1, oldBundle_cluster1,
+                oldNominalTimes_cluster1, feed, true, true);
         } finally {
-            Util.restartService(cluster_2.getProcessHelper());
+            Util.restartService(cluster2.getProcessHelper());
         }
     }
 
-
-    @Test(groups = {"multiCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
     public void updateTimeAfterEndTime_Process()
         throws JAXBException, InterruptedException, IOException, URISyntaxException,
         OozieClientException, AuthenticationException {
-
-    /*
-      submit and schedule process with end time after 60 mins. Set update time
-       as with +60 from start mins.
-    */
+        /* submit and schedule process with end time after 60 mins. Set update time
+           as with +60 from start mins */
         logger.info("Running test updateTimeAfterEndTime_Process");
         String startTime = TimeUtil.getTimeWrtSystemTime(-15);
         String endTime = TimeUtil.getTimeWrtSystemTime(60);
         processBundle.setProcessValidity(startTime, endTime);
         processBundle.submitFeedsScheduleProcess(prism);
         TimeUtil.sleepSeconds(10);
-
         InstanceUtil.waitTillInstanceReachState(serverOC.get(0),
             Util.readEntityName(processBundle.getProcessData()), 0,
             CoordinatorAction.Status.WAITING, EntityType.PROCESS);
 
         //save old data
         String oldProcess = processBundle.getProcessData();
-
         String oldBundleID = InstanceUtil
-            .getLatestBundleID(cluster_1,
+            .getLatestBundleID(cluster1,
                 Util.readEntityName(oldProcess), EntityType.PROCESS);
-
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster_1, oldBundleID,
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID,
             EntityType.PROCESS);
 
         //update
         processBundle.setProcessProperty("someProp", "someVal");
         String updateTime = TimeUtil.addMinsToTime(endTime, 60);
-
         logger.info("Original Feed : " + Util.prettyPrintXml(oldProcess));
         logger.info("Updated Feed :" + Util.prettyPrintXml(processBundle.getProcessData()));
         logger.info("Update Time : " + updateTime);
-
-
         ServiceResponse r = prism.getProcessHelper().update(oldProcess,
             processBundle.getProcessData(), updateTime, null);
         AssertUtil.assertSucceeded(r);
 
         //verify new bundle creation with instances matching
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
             oldProcess, true, false);
-
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 1);
-
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, oldNominalTimes,
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1);
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
             oldProcess, true, true);
     }
 
-    @Test(groups = {"multiCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
     public void updateTimeAfterEndTime_Feed()
         throws JAXBException, IOException, OozieClientException,
         URISyntaxException, AuthenticationException {
-    /*
-    submit and schedule feed with end time 60 mins in future and update with
-    +60
-     in future.
-     */
+
+        /* submit and schedule feed with end time 60 mins in future and update with +60 in future*/
         String startTime = TimeUtil.getTimeWrtSystemTime(-15);
         String endTime = TimeUtil.getTimeWrtSystemTime(60);
 
@@ -514,79 +436,64 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
             XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
             XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
             ClusterType.SOURCE, null);
-
         feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime, endTime),
             XmlUtil.createRtention("days(100000)", ActionType.DELETE),
             Util.readEntityName(processBundle.getClusters().get(0)), ClusterType.SOURCE,
             null, baseTestDir + "/replication" + dateTemplate);
 
-
         ServiceResponse r = prism.getClusterHelper().submitEntity(Util.URLS.SUBMIT_URL,
             processBundle.getClusters().get(0));
         AssertUtil.assertSucceeded(r);
-        r = prism.getFeedHelper().submitAndSchedule(Util.URLS
-            .SUBMIT_AND_SCHEDULE_URL, feed);
+        r = prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertSucceeded(r);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);
 
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 0);
         //save old data
-
-        String oldBundleID = InstanceUtil
-            .getLatestBundleID(cluster_1,
-                Util.readEntityName(feed), EntityType.FEED);
-
+        String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
+            Util.readEntityName(feed), EntityType.FEED);
         String updateTime = TimeUtil.addMinsToTime(endTime, 60);
         String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal");
-
         logger.info("Original Feed : " + Util.prettyPrintXml(feed));
         logger.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed));
         logger.info("Update Time : " + updateTime);
-
         r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
         AssertUtil.assertSucceeded(r);
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 1);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1);
 
         //verify new bundle creation
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, null,
-            feed, true, false);
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, null, feed, true, false);
     }
 
-    @Test(groups = {"multiCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
     public void updateTimeBeforeStartTime_Process() throws JAXBException, IOException,
         URISyntaxException, OozieClientException, AuthenticationException {
 
-    /*
-      submit and schedule process with start time +10 mins from now. Update
-      with start time -4 and update time +2 mins
-     */
+        /* submit and schedule process with start time +10 mins from now. Update with start time
+        -4 and update time +2 mins */
         String startTime = TimeUtil.getTimeWrtSystemTime(10);
         String endTime = TimeUtil.getTimeWrtSystemTime(20);
         processBundle.setProcessValidity(startTime, endTime);
         processBundle.submitFeedsScheduleProcess(prism);
+
         //save old data
         String oldProcess = processBundle.getProcessData();
-        String oldBundleID = InstanceUtil
-            .getLatestBundleID(cluster_1,
-                Util.readEntityName(oldProcess), EntityType.PROCESS);
-        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster_1, oldBundleID,
+        String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
+            Util.readEntityName(oldProcess), EntityType.PROCESS);
+        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID,
             EntityType.PROCESS);
-
-        processBundle.setProcessValidity(TimeUtil.addMinsToTime(startTime, -4),
-            endTime);
+        processBundle.setProcessValidity(TimeUtil.addMinsToTime(startTime, -4), endTime);
         String updateTime = TimeUtil.getTimeWrtSystemTime(2);
         ServiceResponse r = prism.getProcessHelper().update(oldProcess,
             processBundle.getProcessData(), updateTime, null);
         AssertUtil.assertSucceeded(r);
         TimeUtil.sleepSeconds(10);
+
         //verify new bundle creation
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID, oldNominalTimes,
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
             oldProcess, true, false);
-
     }
 
-    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000,
-        enabled = true)
+    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000, enabled = true)
     public void updateDiffClusterDiffValidity_Process()
         throws JAXBException, IOException, URISyntaxException, OozieClientException,
         AuthenticationException {
@@ -599,10 +506,8 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         String startTime_cluster3 = TimeUtil.getTimeWrtSystemTime(-30);
         String endTime_cluster3 = TimeUtil.getTimeWrtSystemTime(180);
 
-
         //create multi cluster bundle
-        processBundle.setProcessValidity(startTime_cluster1,
-            endTime_cluster1);
+        processBundle.setProcessValidity(startTime_cluster1, endTime_cluster1);
         processBundle.addClusterToBundle(bundles[1].getClusters().get(0),
             ClusterType.SOURCE, startTime_cluster2, endTime_cluster2);
         processBundle.addClusterToBundle(bundles[2].getClusters().get(0),
@@ -612,26 +517,20 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         processBundle.submitFeedsScheduleProcess(prism);
 
         //wait for coord to be in running state
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 0);
-        InstanceUtil.waitTillInstancesAreCreated(cluster_3, processBundle.getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 0);
 
         //save old info
-        String oldBundleID_cluster1 = InstanceUtil
-            .getLatestBundleID(cluster_1,
-                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
-        List<String> nominalTimes_cluster1 =
-            OozieUtil.getActionsNominalTime(cluster_1, oldBundleID_cluster1,
-                EntityType.PROCESS);
-        String oldBundleID_cluster2 = InstanceUtil
-            .getLatestBundleID(cluster_2,
-                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
-        String oldBundleID_cluster3 = InstanceUtil
-            .getLatestBundleID(cluster_3,
-                Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
-        List<String> nominalTimes_cluster3 = OozieUtil.getActionsNominalTime
-            (cluster_3, oldBundleID_cluster3,
-                EntityType.PROCESS);
-
+        String oldBundleID_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
+            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
+        List<String> nominalTimes_cluster1 = OozieUtil.getActionsNominalTime(cluster1,
+            oldBundleID_cluster1, EntityType.PROCESS);
+        String oldBundleID_cluster2 = InstanceUtil.getLatestBundleID(cluster2,
+            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
+        String oldBundleID_cluster3 = InstanceUtil.getLatestBundleID(cluster3,
+            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
+        List<String> nominalTimes_cluster3 = OozieUtil.getActionsNominalTime(cluster3,
+            oldBundleID_cluster3, EntityType.PROCESS);
 
         //update process
         String updateTime = TimeUtil.addMinsToTime(endTime_cluster1, 3);
@@ -641,44 +540,37 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         AssertUtil.assertSucceeded(r);
 
         //check for new bundle to be created
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, nominalTimes_cluster1,
-            processBundle.getProcessData(), true, false);
-        OozieUtil.verifyNewBundleCreation(cluster_3, oldBundleID_cluster3,
-            nominalTimes_cluster3,
-            processBundle.getProcessData(), true, false);
-        OozieUtil.verifyNewBundleCreation(cluster_2, oldBundleID_cluster2,
-            nominalTimes_cluster3,
-            processBundle.getProcessData(), true, false);
-
-        //wait till new coord are running on Cluster1
-        InstanceUtil.waitTillInstancesAreCreated(cluster_1, processBundle.getProcessData(), 1);
-        OozieUtil.verifyNewBundleCreation(cluster_1, oldBundleID_cluster1, nominalTimes_cluster1,
-            processBundle.getProcessData(), true, true);
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1,
+            nominalTimes_cluster1, processBundle.getProcessData(), true, false);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleID_cluster3,
+            nominalTimes_cluster3, processBundle.getProcessData(), true, false);
+        OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
+            nominalTimes_cluster3, processBundle.getProcessData(), true, false);
+
+        //wait till new coord are running on cluster1
+        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1);
+        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1,
+            nominalTimes_cluster1, processBundle.getProcessData(), true, true);
 
         //verify
-        String coordStartTime_cluster3 = OozieUtil.getCoordStartTime(cluster_3,
+        String coordStartTime_cluster3 = OozieUtil.getCoordStartTime(cluster3,
             processBundle.getProcessData(), 1);
-        String coordStartTime_cluster2 = OozieUtil.getCoordStartTime(cluster_2,
+        String coordStartTime_cluster2 = OozieUtil.getCoordStartTime(cluster2,
             processBundle.getProcessData(), 1);
 
-        if (!(TimeUtil.oozieDateToDate(coordStartTime_cluster3).isAfter
-            (TimeUtil.oozieDateToDate(updateTime)) || TimeUtil
-            .oozieDateToDate(coordStartTime_cluster3).isEqual
-                (TimeUtil.oozieDateToDate(updateTime))))
-            Assert.assertTrue(false, "new coord start time is not correct");
-
-        if (TimeUtil.oozieDateToDate(coordStartTime_cluster2).isEqual
-            (TimeUtil.oozieDateToDate(updateTime)))
-            Assert.assertTrue(false, "new coord start time is not correct");
-
+        DateTime updateTimeOozie = TimeUtil.oozieDateToDate(updateTime);
+        Assert.assertTrue(TimeUtil.oozieDateToDate(coordStartTime_cluster3).isAfter(updateTimeOozie)
+            || TimeUtil.oozieDateToDate(coordStartTime_cluster3).isEqual(updateTimeOozie),
+            "new coord start time is not correct");
+        Assert.assertFalse(
+            TimeUtil.oozieDateToDate(coordStartTime_cluster2).isEqual(updateTimeOozie),
+            "new coord start time is not correct");
         TimeUtil.sleepTill(updateTime);
-
-        InstanceUtil.waitTillInstancesAreCreated(cluster_3, processBundle.getProcessData(), 1);
+        InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 1);
 
         //verify that no instance are missing
-        OozieUtil.verifyNewBundleCreation(cluster_3, oldBundleID_cluster3,
-            nominalTimes_cluster3,
-            processBundle.getProcessData(), true, true);
+        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleID_cluster3,
+            nominalTimes_cluster3, processBundle.getProcessData(), true, true);
     }
 
     private String submitAndScheduleFeed(Bundle b)
@@ -700,11 +592,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         r = prism.getFeedHelper().submitAndSchedule(Util.URLS
             .SUBMIT_AND_SCHEDULE_URL, feed);
         AssertUtil.assertSucceeded(r);
-
         return feed;
     }
 
-
     private String getMultiClusterFeed(String startTimeCluster_source,
                                        String startTimeCluster_target)
         throws IOException, URISyntaxException, AuthenticationException {
@@ -713,25 +603,20 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         //create desired feed
         String feed = bundles[0].getDataSets().get(0);
 
-        //cluster_1 is target, cluster_2 is source and cluster_3 is neutral
-
+        //cluster1 is target, cluster2 is source and cluster3 is neutral
         feed = InstanceUtil.setFeedCluster(feed,
             XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
             XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
             ClusterType.SOURCE, null);
-
         feed = InstanceUtil.setFeedCluster(feed,
             XmlUtil.createValidity(startTimeCluster_source, "2099-10-01T12:10Z"),
             XmlUtil.createRtention("days(100000)", ActionType.DELETE),
             Util.readEntityName(bundles[2].getClusters().get(0)), null, null);
-
         feed = InstanceUtil.setFeedCluster(feed,
             XmlUtil.createValidity(startTimeCluster_target, "2099-10-01T12:25Z"),
             XmlUtil.createRtention("days(100000)", ActionType.DELETE),
             Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
-            null,
-            testDataDir + dateTemplate);
-
+            null, testDataDir + dateTemplate);
         feed = InstanceUtil.setFeedCluster(feed,
             XmlUtil.createValidity(startTimeCluster_source, "2099-01-01T00:00Z"),
             XmlUtil.createRtention("days(100000)", ActionType.DELETE),
@@ -741,7 +626,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         //submit clusters
         Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
 
-        //create test data on cluster_2
+        //create test data on cluster2
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeCluster_source,
             TimeUtil.getTimeWrtSystemTime(60), 1);
         HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE,


[3/3] git commit: FALCON-633 RetryTests and Retentions tests should stop using root dir contributed by Raghav Kumar Gautam

Posted by sa...@apache.org.
FALCON-633 RetryTests and Retentions tests should stop using root dir contributed by Raghav Kumar Gautam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/ffe18b0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/ffe18b0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/ffe18b0c

Branch: refs/heads/master
Commit: ffe18b0ce3092fceffccff215297f6b7e2706550
Parents: 5dfe5cd
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Thu Aug 28 11:36:20 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Thu Aug 28 11:36:20 2014 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |  4 +
 .../falcon/regression/core/util/BundleUtil.java |  9 +--
 .../apache/falcon/regression/NewRetryTest.java  | 79 +++++++++++---------
 .../falcon/regression/prism/RetentionTest.java  |  2 +-
 4 files changed, 51 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ffe18b0c/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index faf7e02..7a3b84f 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -8,6 +8,10 @@ Trunk (Unreleased)
    FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G 
    via Samarth Gupta)
   IMPROVEMENTS
+
+   FALCON-633 RetryTests and Retentions tests should stop using root dir
+   (Raghav Kumar Gautam via Samarth Gupta)
+
    FALCON-632 Refactoring, documentation stuff (Paul Isaychuk via Samarth Gupta)
 
    FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ffe18b0c/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
index 1f73523..d5790c4 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -48,14 +48,13 @@ public final class BundleUtil {
         return readBundleFromFolder("LateDataBundles");
     }
 
-    public static Bundle readRetryBundle() throws IOException {
-        return readBundleFromFolder("RetryTests");
+    public static Bundle readRetryBundle(String appPath, String testName) throws IOException {
+        return generateBundleFromTemplate("RetryTests", appPath, testName);
     }
 
-    public static Bundle readRetentionBundle() throws IOException {
-        return readBundleFromFolder("RetentionBundles");
+    public static Bundle readRetentionBundle(String appPath, String testName) throws IOException {
+        return generateBundleFromTemplate("RetentionBundles", appPath, testName);
     }
-
     public static Bundle readELBundle() throws IOException {
         return readBundleFromFolder("ELbundle");
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ffe18b0c/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
index 5ab3dfe..8bcc797 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
@@ -74,8 +74,11 @@ public class NewRetryTest extends BaseTestClass {
     DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm");
     final private String baseTestDir = baseHDFSDir + "/NewRetryTest";
     final private String aggregateWorkflowDir = baseTestDir + "/aggregator";
-    final private String lateDir = baseTestDir + "/lateDataTest/testFolders/";
-    final private String latePath = lateDir + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    final private String lateInputDir = baseTestDir + "/lateDataTest/inputFolders/";
+    final private String lateInputPath = lateInputDir + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    final private String lateOutputDir = baseTestDir + "/lateDataTest/outputFolders/";
+    final private String lateOutputPath = lateOutputDir
+        + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
     private DateTime startDate;
     private DateTime endDate;
 
@@ -86,15 +89,17 @@ public class NewRetryTest extends BaseTestClass {
 
     @BeforeMethod(alwaysRun = true)
     public void setUp(Method method) throws Exception {
-        bundles[0] = new Bundle(BundleUtil.readRetryBundle(), cluster);
+        bundles[0] = new Bundle(
+            BundleUtil.readRetryBundle(baseAppHDFSDir, this.getClass().getSimpleName()), cluster);
         bundles[0].generateUniqueBundle();
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         startDate = new DateTime(DateTimeZone.UTC).plusMinutes(1);
         endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2);
         bundles[0].setProcessValidity(startDate, endDate);
 
+        bundles[0].setOutputFeedLocationData(lateOutputPath);
         String feed =
-            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
+            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), lateInputPath);
         feed = Util.insertLateFeedValue(feed, new Frequency("minutes(8)"));
         bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
         bundles[0].getDataSets().add(feed);
@@ -126,8 +131,8 @@ public class NewRetryTest extends BaseTestClass {
         } else {
             AssertUtil.assertSucceeded(response);
             // lets create data now:
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
 
             //schedule process
             AssertUtil.assertSucceeded(
@@ -177,8 +182,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             //now wait till the process is over
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
@@ -235,8 +240,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
 
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
@@ -288,8 +293,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
 
@@ -344,8 +349,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             //now wait till the process is over
@@ -392,8 +397,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             //now wait till the process is over
@@ -444,8 +449,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             //now wait till the process is over
@@ -498,8 +503,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             //now wait till the process is over
@@ -550,8 +555,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             //now wait till the process is over
@@ -589,8 +594,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
 
@@ -643,8 +648,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             //now wait till the process is over
@@ -680,7 +685,7 @@ public class NewRetryTest extends BaseTestClass {
     public void testRetryInSuspendedAndResumeCaseWithLateData(Retry retry) throws Exception {
 
         String feed =
-            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
+            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), lateInputPath);
         feed = Util.insertLateFeedValue(feed, new Frequency("minutes(10)"));
         bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
         bundles[0].getDataSets().add(feed);
@@ -698,8 +703,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             String bundleId = OozieUtil.getBundles(clusterOC,
@@ -774,7 +779,7 @@ public class NewRetryTest extends BaseTestClass {
     public void testRetryInLateDataCase(Retry retry) throws Exception {
 
         String feed =
-            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
+            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), lateInputPath);
 
         feed = Util.insertLateFeedValue(feed, getFrequency(retry));
 
@@ -795,11 +800,11 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             List<String> initialData =
                 Util.getHadoopDataFromDir(clusterFS, bundles[0].getInputFeedFromBundle(),
-                    lateDir);
+                    lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             String bundleId = OozieUtil.getBundles(clusterOC,
@@ -837,7 +842,7 @@ public class NewRetryTest extends BaseTestClass {
             String insertionFolder =
                 Util.findFolderBetweenGivenTimeStamps(now, now.plusMinutes(5), initialData);
             logger.info("inserting data in folder " + insertionFolder + " at " + DateTime.now());
-            HadoopUtil.injectMoreData(clusterFS, lateDir + insertionFolder,
+            HadoopUtil.injectMoreData(clusterFS, lateInputDir + insertionFolder,
                     OSUtil.OOZIE_EXAMPLE_INPUT_DATA + "lateData");
             //now to validate all failed instances to check if they were retried or not.
             validateRetry(clusterOC, bundleId,
@@ -854,7 +859,7 @@ public class NewRetryTest extends BaseTestClass {
     public void testRetryInDeleteAfterPartialRetryCase(Retry retry) throws Exception {
 
         String feed =
-            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
+            Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), lateInputPath);
         feed = Util.insertLateFeedValue(feed, new Frequency("minutes(1)"));
         bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
         bundles[0].getDataSets().add(feed);
@@ -873,8 +878,8 @@ public class NewRetryTest extends BaseTestClass {
             AssertUtil.assertFailed(response);
         } else {
             AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+            HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
             AssertUtil.assertSucceeded(
                 prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
             //now wait till the process is over

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ffe18b0c/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index 1d900d9..b288b77 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -72,7 +72,7 @@ public class RetentionTest extends BaseTestClass {
     @BeforeMethod(alwaysRun = true)
     public void testName(Method method) throws Exception {
         logger.info("test name: " + method.getName());
-        Bundle bundle = BundleUtil.readRetentionBundle();
+        Bundle bundle = BundleUtil.readRetentionBundle(baseAppHDFSDir, this.getClass().getSimpleName());
         bundles[0] = new Bundle(bundle, cluster);
         bundles[0].setInputFeedDataPath(testHDFSDir);
         bundles[0].generateUniqueBundle();


[2/3] git commit: FALCON-632 Refactoring, documentation stuff contributed by Paul Isaychuk

Posted by sa...@apache.org.
FALCON-632 Refactoring, documentation stuff contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5dfe5cde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5dfe5cde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5dfe5cde

Branch: refs/heads/master
Commit: 5dfe5cdef7521d2e41de32d8abd35f9a389ae82a
Parents: 78b9c1a
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Thu Aug 28 11:33:05 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Thu Aug 28 11:33:05 2014 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../regression/EmbeddedPigScriptTest.java       |  60 +++----
 .../regression/FeedInstanceStatusTest.java      | 170 ++++++++-----------
 .../regression/ProcessInstanceKillsTest.java    |  62 +++----
 .../regression/ProcessInstanceRerunTest.java    | 127 ++++++--------
 .../falcon/regression/ProcessLibPathTest.java   |  40 ++---
 .../falcon/regression/prism/RetentionTest.java  |  67 ++++++--
 7 files changed, 239 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 5ed6a89..faf7e02 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -8,6 +8,8 @@ Trunk (Unreleased)
    FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G 
    via Samarth Gupta)
   IMPROVEMENTS
+   FALCON-632 Refactoring, documentation stuff (Paul Isaychuk via Samarth Gupta)
+
    FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored
    (Paul Isaychuk via Samarth Gupta)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
index 1973bf8..0d89fac 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
@@ -69,28 +69,24 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
     String inputPath = pigTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
     private static final Logger logger = Logger.getLogger(EmbeddedPigScriptTest.class);
     private static final double TIMEOUT = 15;
+    String processName;
+    String process;
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
-
         logger.info("in @BeforeClass");
+
         //copy pig script
         HadoopUtil.uploadDir(clusterFS, pigScriptDir, OSUtil.RESOURCES + "pig");
-
         Bundle bundle = BundleUtil.readELBundle();
         bundle.generateUniqueBundle();
         bundle = new Bundle(bundle, cluster);
-
         String startDate = "2010-01-02T00:40Z";
         String endDate = "2010-01-02T01:10Z";
-
         bundle.setInputFeedDataPath(inputPath);
         prefix = bundle.getFeedDataPathPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
-        List<String> dataDates =
-            TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
     }
 
@@ -120,6 +116,8 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
         processElement.getWorkflow().setEngine(EngineType.PIG);
         bundles[0].setProcessData(processElement.toString());
         bundles[0].submitFeedsScheduleProcess(prism);
+        process = bundles[0].getProcessData();
+        processName = Util.readEntityName(process);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -129,70 +127,56 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
 
     @Test(groups = {"singleCluster"})
     public void getResumedProcessInstance() throws Exception {
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
-            Job.Status.RUNNING);
-        prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+        prism.getProcessHelper().suspend(URLS.SUSPEND_URL, process);
         TimeUtil.sleepSeconds(TIMEOUT);
-        ServiceResponse status =
-            prism.getProcessHelper().getStatus(URLS.STATUS_URL, bundles[0].getProcessData());
+        ServiceResponse status = prism.getProcessHelper().getStatus(URLS.STATUS_URL, process);
         Assert.assertTrue(status.getMessage().contains("SUSPENDED"), "Process not suspended.");
-        prism.getProcessHelper().resume(URLS.RESUME_URL, bundles[0].getProcessData());
+        prism.getProcessHelper().resume(URLS.RESUME_URL, process);
         TimeUtil.sleepSeconds(TIMEOUT);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
-            Job.Status.RUNNING);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
         InstancesResult r = prism.getProcessHelper()
-            .getRunningInstance(URLS.INSTANCE_RUNNING,
-                Util.readEntityName(bundles[0].getProcessData()));
+            .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
     }
 
     @Test(groups = {"singleCluster"})
     public void getSuspendedProcessInstance() throws Exception {
-        prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+        prism.getProcessHelper().suspend(URLS.SUSPEND_URL, process);
         TimeUtil.sleepSeconds(TIMEOUT);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
-            Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
         InstancesResult r = prism.getProcessHelper()
-            .getRunningInstance(URLS.INSTANCE_RUNNING,
-                Util.readEntityName(bundles[0].getProcessData()));
+            .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
         InstanceUtil.validateSuccessWOInstances(r);
     }
 
     @Test(groups = {"singleCluster"})
     public void getRunningProcessInstance() throws Exception {
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
-            Job.Status.RUNNING);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
         InstancesResult r = prism.getProcessHelper()
-            .getRunningInstance(URLS.INSTANCE_RUNNING,
-                Util.readEntityName(bundles[0].getProcessData()));
+            .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
     }
 
     @Test(groups = {"singleCluster"})
     public void getKilledProcessInstance() throws Exception {
-        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        prism.getProcessHelper().delete(URLS.DELETE_URL, process);
         InstancesResult r = prism.getProcessHelper()
-            .getRunningInstance(URLS.INSTANCE_RUNNING,
-                Util.readEntityName(bundles[0].getProcessData()));
+            .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
         Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
             "Unexpected status code");
     }
 
     @Test(groups = {"singleCluster"})
     public void getSucceededProcessInstance() throws Exception {
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
-            Job.Status.RUNNING);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
         InstancesResult r = prism.getProcessHelper()
-            .getRunningInstance(URLS.INSTANCE_RUNNING,
-                Util.readEntityName(bundles[0].getProcessData()));
+            .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
-
         int counter = OSUtil.IS_WINDOWS ? 100 : 50;
         InstanceUtil.waitForBundleToReachState(cluster, Util.getProcessName(bundles[0]
             .getProcessData()), Job.Status.SUCCEEDED, counter);
-        r = prism.getProcessHelper()
-            .getRunningInstance(URLS.INSTANCE_RUNNING,
-                Util.readEntityName(bundles[0].getProcessData()));
+        r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING, processName);
         InstanceUtil.validateSuccessWOInstances(r);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
index ff227d6..acf3bb3 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
@@ -82,27 +82,29 @@ public class FeedInstanceStatusTest extends BaseTestClass {
         removeBundles();
     }
 
+    /**
+     * Goes through the whole feed replication workflow checking its instances status while
+     * submitting feed, scheduling it, performing different combinations of actions like
+     * -submit, -resume, -kill, -rerun.
+     */
     @Test(groups = {"multiCluster"})
     public void feedInstanceStatus_running() throws Exception {
         bundles[0].setInputFeedDataPath(feedInputPath);
 
         logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
-
-        ServiceResponse r = prism.getClusterHelper()
-            .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
-        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        AssertUtil.assertSucceeded(prism.getClusterHelper()
+            .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0)));
 
         logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
-        r = prism.getClusterHelper()
-            .submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
-        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        AssertUtil.assertSucceeded(prism.getClusterHelper()
+            .submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0)));
 
         logger.info("cluster bundle3: " + Util.prettyPrintXml(bundles[2].getClusters().get(0)));
-        r = prism.getClusterHelper()
-            .submitEntity(URLS.SUBMIT_URL, bundles[2].getClusters().get(0));
-        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        AssertUtil.assertSucceeded(prism.getClusterHelper()
+            .submitEntity(URLS.SUBMIT_URL, bundles[2].getClusters().get(0)));
 
         String feed = bundles[0].getDataSets().get(0);
+        String feedName = Util.readEntityName(feed);
         feed = InstanceUtil.setFeedCluster(feed,
             XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
             XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
@@ -110,7 +112,7 @@ public class FeedInstanceStatusTest extends BaseTestClass {
         String startTime = TimeUtil.getTimeWrtSystemTime(-50);
 
         feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime,
-                TimeUtil.addMinsToTime(startTime, 65)),
+            TimeUtil.addMinsToTime(startTime, 65)),
             XmlUtil.createRtention("hours(10)", ActionType.DELETE),
             Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
             "US/${cluster.colo}");
@@ -126,48 +128,38 @@ public class FeedInstanceStatusTest extends BaseTestClass {
             Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
             "UK/${cluster.colo}");
 
-
         logger.info("feed: " + Util.prettyPrintXml(feed));
 
         //status before submit
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 100) + "&end=" +
-                TimeUtil.addMinsToTime(startTime, 120));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 100)
+                + "&end=" + TimeUtil.addMinsToTime(startTime, 120));
 
         AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed));
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil
-                    .addMinsToTime(startTime, 100));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 100));
 
         AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
 
         // both replication instances
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil
-                    .addMinsToTime(startTime, 100));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 100));
 
         // single instance at -30
-        prism.getFeedHelper().getProcessInstanceStatus(Util.readEntityName(feed),
-            "?start=" + TimeUtil
-                .addMinsToTime(startTime, 20));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 20));
 
         //single at -10
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 40));
 
         //single at 10
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 40));
 
         //single at 30
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 40));
 
         String postFix = "/US/" + cluster2.getClusterHelper().getColo();
         String prefix = bundles[0].getFeedDataPathPrefix();
@@ -180,102 +172,74 @@ public class FeedInstanceStatusTest extends BaseTestClass {
         HadoopUtil.lateDataReplenish(cluster3FS, 80, 20, prefix, postFix);
 
         // both replication instances
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil
-                    .addMinsToTime(startTime, 100));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 100));
 
         // single instance at -30
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 20));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 20));
 
         //single at -10
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 40));
 
         //single at 10
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 40));
 
         //single at 30
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 40));
 
         logger.info("Wait till feed goes into running ");
 
         //suspend instances -10
-        prism.getFeedHelper()
-            .getProcessInstanceSuspend(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 40));
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 20) + "&end=" +
-                TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceSuspend(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+                + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
 
         //resuspend -10 and suspend -30 source specific
-        prism.getFeedHelper()
-            .getProcessInstanceSuspend(Util.readEntityName(feed),
-                "?start=" + TimeUtil
-                    .addMinsToTime(startTime, 20) + "&end=" +
-                    TimeUtil.addMinsToTime(startTime, 40));
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 20) + "&end=" +
-                TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceSuspend(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+                + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+                + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
 
         //resume -10 and -30
-        prism.getFeedHelper()
-            .getProcessInstanceResume(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 20) + "&end=" +
-                TimeUtil.addMinsToTime(startTime, 40));
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 20) + "&end=" +
-                TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceResume(feedName, "?start=" + TimeUtil
+            .addMinsToTime(startTime, 20) + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
+        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + TimeUtil
+            .addMinsToTime(startTime, 20) + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
 
         //get running instances
-        prism.getFeedHelper().getRunningInstance(URLS.INSTANCE_RUNNING, Util.readEntityName(feed));
+        prism.getFeedHelper().getRunningInstance(URLS.INSTANCE_RUNNING, feedName);
 
         //rerun succeeded instance
-        prism.getFeedHelper()
-            .getProcessInstanceRerun(Util.readEntityName(feed), "?start=" + startTime);
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil
-                    .addMinsToTime(startTime, 20));
+        prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
+        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startTime
+            + "&end=" + TimeUtil.addMinsToTime(startTime, 20));
 
         //kill instance
-        prism.getFeedHelper()
-            .getProcessInstanceKill(Util.readEntityName(feed), "?start=" + TimeUtil
-                .addMinsToTime(startTime, 44));
-        prism.getFeedHelper()
-            .getProcessInstanceKill(Util.readEntityName(feed), "?start=" + startTime);
+        prism.getFeedHelper().getProcessInstanceKill(feedName,
+            "?start=" + TimeUtil.addMinsToTime(startTime, 44));
+        prism.getFeedHelper().getProcessInstanceKill(feedName, "?start=" + startTime);
 
         //end time should be less than end of validity i.e startTime + 110
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil
-                    .addMinsToTime(startTime, 110));
-
+        prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
 
         //rerun killed instance
-        prism.getFeedHelper()
-            .getProcessInstanceRerun(Util.readEntityName(feed), "?start=" + startTime);
-        prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil
-                    .addMinsToTime(startTime, 110));
+        prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
+        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startTime
+            + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
 
         //kill feed
         prism.getFeedHelper().delete(URLS.DELETE_URL, feed);
-        InstancesResult responseInstance = prism.getFeedHelper()
-            .getProcessInstanceStatus(Util.readEntityName(feed),
-                "?start=" + startTime + "&end=" + TimeUtil
-                    .addMinsToTime(startTime, 110));
+        InstancesResult responseInstance = prism.getFeedHelper().getProcessInstanceStatus(feedName,
+            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
 
         logger.info(responseInstance.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index e7b2616..e4129e6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -55,29 +55,25 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
     private String testDir = "/ProcessInstanceKillsTest";
     private String baseTestHDFSDir = baseHDFSDir + testDir;
     private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
-    private String feedInputPath = baseTestHDFSDir +
-        "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
-    private String feedOutputPath =
-        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedInputPath = baseTestHDFSDir + "/input" + datePattern;
+    private String feedOutputPath = baseTestHDFSDir + "/output-data" + datePattern;
     private static final Logger LOGGER = Logger.getLogger(ProcessInstanceKillsTest.class);
     private static final double TIMEOUT = 15;
+    String processName;
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
         LOGGER.info("in @BeforeClass");
         HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
         Bundle b = BundleUtil.readELBundle();
         b.generateUniqueBundle();
         b = new Bundle(b, cluster);
-
         String startDate = "2010-01-01T23:20Z";
         String endDate = "2010-01-02T01:21Z";
-
         b.setInputFeedDataPath(feedInputPath);
         String prefix = b.getFeedDataPathPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
     }
@@ -85,12 +81,12 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
     @BeforeMethod(alwaysRun = true)
     public void setup(Method method) throws Exception {
         LOGGER.info("test name: " + method.getName());
-
         bundles[0] = BundleUtil.readELBundle();
         bundles[0] = new Bundle(bundles[0], cluster);
         bundles[0].generateUniqueBundle();
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         bundles[0].setInputFeedDataPath(feedInputPath);
+        processName = Util.readEntityName(bundles[0].getProcessData());
     }
 
     @AfterMethod(alwaysRun = true)
@@ -115,8 +111,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
         TimeUtil.sleepSeconds(TIMEOUT);
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
     }
 
@@ -139,8 +134,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
         TimeUtil.sleepSeconds(TIMEOUT);
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T00:03Z&end=2010-01-02T00:03Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T00:03Z&end=2010-01-02T00:03Z");
         InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
     }
 
@@ -162,8 +156,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
         TimeUtil.sleepSeconds(TIMEOUT);
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T00:03Z&end=2010-01-02T00:30Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T00:03Z&end=2010-01-02T00:30Z");
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
         LOGGER.info(r.toString());
     }
@@ -196,9 +189,8 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
         String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17);
         String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23);
-        InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=" + startTimeRequest + "&end=" + endTimeRequest);
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
+            "?start=" + startTimeRequest + "&end=" + endTimeRequest);
         LOGGER.info(r.toString());
     }
 
@@ -222,8 +214,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         String startTime = TimeUtil.getTimeWrtSystemTime(1);
         String endTime = TimeUtil.getTimeWrtSystemTime(40);
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=" + startTime + "&end=" + endTime);
+            .getProcessInstanceKill(processName, "?start=" + startTime + "&end=" + endTime);
         LOGGER.info(r.getMessage());
         Assert.assertEquals(r.getInstances(), null);
     }
@@ -245,12 +236,10 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].submitFeedsScheduleProcess(prism);
         TimeUtil.sleepSeconds(TIMEOUT);
         prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:05Z&end=2010-01-02T01:15Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T01:05Z&end=2010-01-02T01:15Z");
         TimeUtil.sleepSeconds(TIMEOUT);
-        InstancesResult result = prism.getProcessHelper()
-            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
         InstanceUtil.validateResponse(result, 5, 2, 0, 0, 3);
     }
 
@@ -269,13 +258,10 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
         TimeUtil.sleepSeconds(TIMEOUT);
-        prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:20Z");
+        prism.getProcessHelper().getProcessInstanceKill(processName, "?start=2010-01-02T01:20Z");
         TimeUtil.sleepSeconds(TIMEOUT);
-        InstancesResult result = prism.getProcessHelper()
-            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
         InstanceUtil.validateResponse(result, 5, 4, 0, 0, 1);
     }
 
@@ -294,11 +280,9 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
         prism.getProcessHelper()
-            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z");
+            .getProcessInstanceSuspend(processName, "?start=2010-01-02T01:00Z");
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
     }
 
@@ -316,15 +300,13 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(bundles[0]
-            .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUCCEEDED);
     }
 
-
     @AfterClass(alwaysRun = true)
     public void deleteData() throws Exception {
         LOGGER.info("in @AfterClass");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
index 119d871..df65a79 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -53,30 +53,28 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
 
     private String baseTestDir = baseHDFSDir + "/ProcessInstanceRerunTest";
     private String aggregateWorkflowDir = baseTestDir + "/aggregator";
-    private String feedInputPath = baseTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
-    private String feedOutputPath = baseTestDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
-    private String feedInputTimedOutPath =
-        baseTestDir + "/timedout/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
-
+    private String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedInputPath = baseTestDir + "/input" + datePattern;
+    private String feedOutputPath = baseTestDir + "/output-data" + datePattern;
+    private String feedInputTimedOutPath = baseTestDir + "/timedout" + datePattern;
     private ColoHelper cluster = servers.get(0);
     private FileSystem clusterFS = serverFS.get(0);
     private OozieClient clusterOC = serverOC.get(0);
     private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRerunTest.class);
     private static final double TIMEOUT = 10;
+    private String processName;
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
         LOGGER.info("in @BeforeClass");
         HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
         Bundle b = BundleUtil.readELBundle();
-
         b = new Bundle(b, cluster);
         String startDate = "2010-01-02T00:40Z";
         String endDate = "2010-01-02T01:20Z";
         b.setInputFeedDataPath(feedInputPath);
         String prefix = b.getFeedDataPathPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
     }
@@ -89,6 +87,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].generateUniqueBundle();
         bundles[0].setInputFeedDataPath(feedInputPath);
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        processName = bundles[0].getProcessName();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -111,17 +110,15 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
+        String process = bundles[0].getProcessData();
         TimeUtil.sleepSeconds(TIMEOUT);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:16Z");
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:16Z");
         InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
-        List<String> wfIDs =
-            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+        prism.getProcessHelper().getProcessInstanceRerun(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
     }
 
@@ -137,18 +134,16 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(5);
-        LOGGER.info("process: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        String process = bundles[0].getProcessData();
+        LOGGER.info("process: " + Util.prettyPrintXml(process));
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
-        List<String> wfIDs =
-            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        List<String> wfIDs =  InstanceUtil.getWorkflows(cluster, processName);
+        prism.getProcessHelper().
+            getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
     }
 
@@ -166,16 +161,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String process = bundles[0].getProcessData();
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
         InstancesResult r = prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+            .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
-        List<String> wfIDs =
-            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+        prism.getProcessHelper().getProcessInstanceRerun(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
         TimeUtil.sleepSeconds(TIMEOUT);
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0);
     }
@@ -193,15 +186,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        prism.getProcessHelper()
-            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z");
-        String wfID = InstanceUtil.getWorkflows(cluster,
-            Util.getProcessName(bundles[0].getProcessData()), Status.KILLED).get(0);
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z");
+        String process = bundles[0].getProcessData();
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        prism.getProcessHelper().getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
+        String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.KILLED).get(0);
+        prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z");
         Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
     }
 
@@ -219,15 +208,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(6);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        String wfID = InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0]
-            .getProcessData()), Status.RUNNING, Status.SUCCEEDED).get(0);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 0, CoordinatorAction
+        String process = bundles[0].getProcessData();
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.RUNNING,
+               Status.SUCCEEDED).get(0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction
             .Status.SUCCEEDED, EntityType.PROCESS);
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z");
+        prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z");
         Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
     }
 
@@ -245,14 +232,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(2);
         bundles[0].submitFeedsScheduleProcess(prism);
-        prism.getProcessHelper()
-            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
-        Assert.assertEquals(InstanceUtil
-            .getInstanceStatus(cluster, Util.getProcessName(bundles[0].getProcessData()), 0, 1),
+        prism.getProcessHelper().getProcessInstanceSuspend(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+        prism.getProcessHelper().getProcessInstanceRerun(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+        Assert.assertEquals(InstanceUtil.getInstanceStatus(cluster, processName, 0, 1),
             CoordinatorAction.Status.SUSPENDED);
     }
 
@@ -269,14 +253,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
-            .getProcessData()), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-        List<String> wfIDs =
-            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        String process = bundles[0].getProcessData();
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+        prism.getProcessHelper().getProcessInstanceRerun(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
     }
 
@@ -297,13 +280,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitFeedsScheduleProcess(prism);
         CoordinatorAction.Status s;
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.getProcessName(bundles[0]
-            .getProcessData()), 1, CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
-        prism.getProcessHelper()
-            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
-                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
-        s = InstanceUtil
-            .getInstanceStatus(cluster, Util.readEntityName(bundles[0].getProcessData()), 0, 0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+            CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
+        prism.getProcessHelper().getProcessInstanceRerun(processName,
+            "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        s = InstanceUtil.getInstanceStatus(cluster, processName, 0, 0);
         Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
             "instance should have been in WAITING state");
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index fc8e4a8..7647d15 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -52,45 +52,43 @@ public class ProcessLibPathTest extends BaseTestClass {
     String testDir = baseHDFSDir + "/ProcessLibPath";
     String testLibDir = testDir + "/TestLib";
     private static final Logger logger = Logger.getLogger(ProcessLibPathTest.class);
+    String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    String processName;
+    String process;
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
-
         logger.info("in @BeforeClass");
+
         //common lib for both test cases
         HadoopUtil.uploadDir(clusterFS, testLibDir, OSUtil.RESOURCES_OOZIE + "lib");
-
         Bundle b = BundleUtil.readELBundle();
         b.generateUniqueBundle();
         b = new Bundle(b, cluster);
-
         String startDate = "2010-01-01T22:00Z";
         String endDate = "2010-01-02T03:00Z";
-
-        b.setInputFeedDataPath(testDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        b.setInputFeedDataPath(testDir + "/input" + datePattern);
         String prefix = b.getFeedDataPathPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
     }
 
-
     @BeforeMethod(alwaysRun = true)
     public void testName(Method method) throws Exception {
         logger.info("test name: " + method.getName());
         bundles[0] = BundleUtil.readELBundle();
         bundles[0] = new Bundle(bundles[0], cluster);
         bundles[0].generateUniqueBundle();
-        bundles[0].setInputFeedDataPath(baseHDFSDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        bundles[0].setInputFeedDataPath(baseHDFSDir + datePattern);
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
-        bundles[0].setOutputFeedLocationData(
-            baseHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        bundles[0].setOutputFeedLocationData(baseHDFSDir + "/output-data" + datePattern);
         bundles[0].setProcessConcurrency(1);
         bundles[0].setProcessLibPath(testLibDir);
+        process = bundles[0].getProcessData();
+        processName = Util.readEntityName(process);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -109,13 +107,11 @@ public class ProcessLibPathTest extends BaseTestClass {
         HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
         HadoopUtil.deleteDirIfExists(workflowDir + "/lib", clusterFS);
         bundles[0].setProcessWorkflow(workflowDir);
-        logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        logger.info("processData: " + Util.prettyPrintXml(process));
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
-                .getProcessData()), 0);
-        InstanceUtil
-            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+        InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
     }
 
     /**
@@ -131,12 +127,10 @@ public class ProcessLibPathTest extends BaseTestClass {
         HadoopUtil.copyDataToFolder(clusterFS, workflowDir + "/lib",
             OSUtil.RESOURCES + "ivory-oozie-lib-0.1.jar");
         bundles[0].setProcessWorkflow(workflowDir);
-        logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        logger.info("processData: " + Util.prettyPrintXml(process));
         bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
-                .getProcessData()), 0);
-        InstanceUtil
-            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+        InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index 85bd770..1d900d9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -65,7 +65,6 @@ public class RetentionTest extends BaseTestClass {
     String baseTestHDFSDir = baseHDFSDir + "/RetentionTest/";
     String testHDFSDir = baseTestHDFSDir + TEST_FOLDERS;
     private static final Logger logger = Logger.getLogger(RetentionTest.class);
-
     ColoHelper cluster = servers.get(0);
     FileSystem clusterFS = serverFS.get(0);
     OozieClient clusterOC = serverOC.get(0);
@@ -85,12 +84,26 @@ public class RetentionTest extends BaseTestClass {
         removeBundles();
     }
 
+    /**
+     * Particular test case for https://issues.apache.org/jira/browse/FALCON-321
+     * @throws Exception
+     */
     @Test
     public void testRetentionWithEmptyDirectories() throws Exception {
-        // test for https://issues.apache.org/jira/browse/FALCON-321
         testRetention(24, RetentionUnit.HOURS, true, FeedType.DAILY, false);
     }
 
+    /**
+     * Tests retention with different parameters. Validates its results based on expected and
+     * actual retained data.
+     *
+     * @param retentionPeriod period for which data should be retained
+     * @param retentionUnit type of retention limit attribute
+     * @param gaps defines gaps within list of data folders
+     * @param feedType feed type
+     * @param withData should folders be filled with data or not
+     * @throws Exception
+     */
     @Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1)
     public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit,
         final boolean gaps, final FeedType feedType, final boolean withData) throws Exception {
@@ -125,16 +138,28 @@ public class RetentionTest extends BaseTestClass {
         if (gap) {
             skip = gaps[new Random().nextInt(gaps.length)];
         }
-
         final DateTime today = new DateTime(DateTimeZone.UTC);
         final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
             feedType.addTime(today, -36), feedType.addTime(today, 36), skip, feedType);
         final List<String> dataDates = TimeUtil.convertDatesToString(times, feedType.getFormatter());
         logger.info("dataDates = " + dataDates);
-
         HadoopUtil.replenishData(clusterFS, testHDFSDir, dataDates, withData);
     }
 
+    /**
+     * Schedules feed and waits till retention succeeds. Makes validation of data which was removed
+     * and which was retained.
+     *
+     * @param feed analyzed retention feed
+     * @param feedType feed type
+     * @param retentionUnit type of retention limit attribute
+     * @param retentionPeriod period for which data should be retained
+     * @throws OozieClientException
+     * @throws IOException
+     * @throws URISyntaxException
+     * @throws AuthenticationException
+     * @throws JMSException
+     */
     private void commonDataRetentionWorkflow(String feed, FeedType feedType,
         RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
         IOException, URISyntaxException, AuthenticationException, JMSException {
@@ -148,22 +173,20 @@ public class RetentionTest extends BaseTestClass {
         JmsMessageConsumer messageConsumer = new JmsMessageConsumer("FALCON." + feedName,
                 cluster.getClusterHelper().getActiveMQ());
         messageConsumer.start();
-
         final DateTime currentTime = new DateTime(DateTimeZone.UTC);
         String bundleId = OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0);
 
         List<String> workflows = OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC);
         logger.info("workflows: " + workflows);
-
         messageConsumer.interrupt();
         Util.printMessageData(messageConsumer);
+
         //now look for cluster data
         List<String> finalData = Util.getHadoopDataFromDir(clusterFS, feed, testHDFSDir);
 
         //now see if retention value was matched to as expected
         List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit,
             retentionPeriod, feedType);
-
         logger.info("initialData = " + initialData);
         logger.info("finalData = " + finalData);
         logger.info("expectedOutput = " + expectedOutput);
@@ -171,23 +194,31 @@ public class RetentionTest extends BaseTestClass {
         final List<String> missingData = new ArrayList<String>(initialData);
         missingData.removeAll(expectedOutput);
         validateDataFromFeedQueue(feedName, messageConsumer.getReceivedMessages(), missingData);
-
         Assert.assertEquals(finalData.size(), expectedOutput.size(),
-            "sizes of outputs are different! please check");
+            "Expected and actual sizes of retained data are different! Please check.");
 
         Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
             expectedOutput.toArray(new String[expectedOutput.size()])));
     }
 
+    /**
+     * Makes validation based on comparison of data which is expected to be removed with data
+     * mentioned in messages from ActiveMQ
+     *
+     * @param feedName feed name
+     * @param messages messages from ActiveMQ
+     * @param missingData data which is expected to be removed after retention succeeded
+     * @throws OozieClientException
+     * @throws JMSException
+     */
     private void validateDataFromFeedQueue(String feedName, List<MapMessage> messages,
         List<String> missingData) throws OozieClientException, JMSException {
         //just verify that each element in queue is same as deleted data!
         List<String> workflowIds = OozieUtil.getWorkflowJobs(cluster,
                 OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
 
-        //create queuedata folderList:
+        //create queue data folderList:
         List<String> deletedFolders = new ArrayList<String>();
-
         for (MapMessage message : messages) {
             if (message != null) {
                 Assert.assertEquals(message.getString("entityName"), feedName);
@@ -205,7 +236,6 @@ public class RetentionTest extends BaseTestClass {
                     cluster.getFeedHelper().getActiveMQ());
             }
         }
-
         Assert.assertEquals(deletedFolders.size(), missingData.size(),
             "Output size is different than expected!");
         Assert.assertTrue(Arrays.deepEquals(missingData.toArray(new String[missingData.size()]),
@@ -213,6 +243,16 @@ public class RetentionTest extends BaseTestClass {
             "The missing data and message for delete operation don't correspond");
     }
 
+    /**
+     * Evaluates amount of data which is expected to be retained
+     *
+     * @param inputData initial data on cluster
+     * @param currentTime current date
+     * @param retentionUnit type of retention limit attribute
+     * @param retentionPeriod period for which data should be retained
+     * @param feedType feed type
+     * @return list of data folders which are expected to be present on cluster
+     */
     private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime,
         RetentionUnit retentionUnit, int retentionPeriod, FeedType feedType) {
         final List<String> finalData = new ArrayList<String>();
@@ -232,6 +272,9 @@ public class RetentionTest extends BaseTestClass {
 
     final static int[] gaps = new int[]{2, 4, 5, 1};
 
+    /**
+     * Provides different sets of parameters for retention workflow.
+     */
     @DataProvider(name = "betterDP")
     public Object[][] getTestData(Method m) {
         // a negative value like -4 should be covered in validation scenarios.