You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:25:50 UTC

[04/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
deleted file mode 100644
index 03bc358..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ /dev/null
@@ -1,1182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.process.PolicyType;
-import org.apache.falcon.entity.v0.process.Retry;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.MatrixUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests with Retries.
- */
-@Test(groups = "embedded")
-public class NewRetryTest extends BaseTestClass {
-
-    private static final Logger LOGGER = Logger.getLogger(NewRetryTest.class);
-    private ColoHelper cluster = servers.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-
-    private DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm");
-    private final String baseTestDir = cleanAndGetTestDir();
-    private final String aggregateWorkflowDir = baseTestDir + "/aggregator";
-    private final String lateDir = baseTestDir + "/lateDataTest/testFolders";
-    private final String latePath = lateDir + MINUTE_DATE_PATTERN;
-    private DateTime startDate;
-    private DateTime endDate;
-
-    @BeforeClass(alwaysRun = true)
-    public void uploadWorkflow() throws Exception {
-        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws Exception {
-        bundles[0] = new Bundle(BundleUtil.readRetryBundle(), cluster);
-        bundles[0].generateUniqueBundle(this);
-        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
-        startDate = new DateTime(DateTimeZone.UTC).plusMinutes(1);
-        endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2);
-        bundles[0].setProcessValidity(TimeUtil.dateToOozieDate(startDate.toDate()),
-            TimeUtil.dateToOozieDate(endDate.toDate()));
-
-        FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        feed.setFeedPathValue(latePath).insertLateFeedValue(new Frequency("minutes(8)"));
-        bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
-        bundles[0].getDataSets().add(feed.toString());
-        bundles[0].setOutputFeedLocationData(baseTestDir + "/output" + MINUTE_DATE_PATTERN);
-        bundles[0].submitClusters(prism);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = true)
-    public void testRetryInProcessZeroAttemptUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            // lets create data now:
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-
-            //schedule process
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
-
-            int defaultRetries = bundles[0].getProcessObject().getRetry().getAttempts();
-
-            retry.setAttempts((0));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-            prism.getProcessHelper()
-                .update((bundles[0].getProcessData()), bundles[0].getProcessData());
-            String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-
-            Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId, defaultRetries);
-            checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = true)
-    public void testRetryInProcessLowerAttemptUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            //now wait till the process is over
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            for (int attempt = 0;
-                 attempt < 20 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
-                "Failure Retry validation failed");
-
-
-            retry.setAttempts((retry.getAttempts() - 2));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-
-            if ((retry.getAttempts() - 2) > 0) {
-                Assert.assertTrue(prism.getProcessHelper()
-                    .update((bundles[0].getProcessData()), bundles[0].getProcessData())
-                    .getMessage().contains("updated successfully"),
-                    "process was not updated successfully");
-                String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
-                    bundles[0].getProcessName(), EntityType.PROCESS);
-
-                Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-                //now to validate all failed instances to check if they were retried or not.
-                validateRetry(clusterOC, bundleId, retry.getAttempts() - 2);
-                if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                    checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-                }
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInProcessLowerManageableAttemptUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            for (int i = 0; i < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++i) {
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
-                "Failure Retry validation failed");
-
-            retry.setAttempts((retry.getAttempts() - 1));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-            Assert.assertTrue(prism.getProcessHelper()
-                    .update((bundles[0].getProcessData()), bundles[0].getProcessData())
-                    .getMessage().contains("updated successfully"),
-                "process was not updated successfully");
-            String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-
-            Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId, retry.getAttempts() - 1);
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInProcessLowerBoundaryAttemptUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            for (int attempt = 0;
-                 attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 2); ++attempt) {
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 2),
-                "Failure Retry validation failed");
-
-
-            retry.setAttempts((2));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-            Assert.assertTrue(
-                prism.getProcessHelper()
-                    .update((bundles[0].getProcessData()), bundles[0].getProcessData())
-                    .getMessage().contains("updated successfully"),
-                "process was not updated successfully");
-            String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-
-            Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId, 2);
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInProcessUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
-            retry.setAttempts((4));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-            Assert.assertTrue(prism.getProcessHelper()
-                .update(bundles[0].getProcessName(),
-                    null).getMessage()
-                .contains("updated successfully"), "process was not updated successfully");
-            String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-
-            Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId, 4);
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInProcessHigherDelayUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
-            retry.setDelay(new Frequency("minutes(" + (retry.getDelay().getFrequency() + 1) + ")"));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-            Assert.assertTrue(
-                prism.getProcessHelper().update(bundles[0].getProcessName(),
-                    bundles[0].getProcessData()).getMessage()
-                    .contains("updated successfully"), "process was not updated successfully");
-            String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-
-            Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInProcessLowerDelayUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
-            retry.setDelay(new Frequency(
-                "minutes(" + (Integer.parseInt(retry.getDelay().getFrequency()) - 1) + ")"));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-            Assert.assertTrue(prism.getProcessHelper()
-                    .update(bundles[0].getProcessName(),
-                        bundles[0].getProcessData()).getMessage()
-                    .contains("updated successfully"),
-                "process was not updated successfully");
-            String newBundleId = OozieUtil
-                .getLatestBundleID(clusterOC, bundles[0].getProcessName(),
-                    EntityType.PROCESS);
-
-            Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInProcessZeroDelayUpdate(Retry retry) throws Exception {
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        bundles[0].setRetry(retry);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
-            retry.setDelay(new Frequency("minutes(0)"));
-
-            bundles[0].setRetry(retry);
-
-            LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-            Assert.assertFalse(
-                prism.getProcessHelper().update(bundles[0].getProcessName()
-                    , bundles[0].getProcessData()).getMessage().contains("updated successfully"),
-                "process was updated successfully!!!");
-            String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS);
-
-            Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInSimpleFailureCase(Retry retry) throws Exception {
-
-        bundles[0].setRetry(retry);
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        bundles[0].setProcessLatePolicy(null);
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testUserRetryWhileAutomaticRetriesHappen(Retry retry) throws Exception {
-
-        DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm");
-
-        bundles[0].setRetry(retry);
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        LOGGER.info("process dates: " + startDate + "," + endDate);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            for (int attempt = 0;
-                 attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
-                "Failure Retry validation failed");
-
-            //now start firing random retries
-            LOGGER.info("now firing user reruns:");
-            for (int i = 0; i < 1; i++) {
-                prism.getProcessHelper()
-                    .getProcessInstanceRerun(bundles[0].getProcessName(),
-                        "?start=" + timeFormatter.print(startDate).replace("/", "T") + "Z"
-                            + "&end=" + timeFormatter.print(endDate).replace("/", "T") + "Z");
-            }
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testUserRetryAfterAutomaticRetriesHappen(Retry retry) throws Exception {
-
-        DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm");
-
-        bundles[0].setRetry(retry);
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        LOGGER.info("process dates: " + startDate + "," + endDate);
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(),
-                EntityType.PROCESS).get(0);
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-
-            LOGGER.info("now firing user reruns:");
-
-            DateTime[] dateBoundaries = getFailureTimeBoundaries(clusterOC, bundleId);
-            InstancesResult piResult = prism.getProcessHelper()
-                .getProcessInstanceRerun(bundles[0].getProcessName(),
-                    "?start=" + timeFormatter.print(dateBoundaries[0]).replace("/", "T") + "Z"
-                        + "&end=" + timeFormatter.print(dateBoundaries[dateBoundaries.length - 1])
-                         .replace("/", "T") + "Z");
-
-            AssertUtil.assertSucceeded(piResult);
-
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts() + 1);
-
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInSuspendedAndResumeCaseWithLateData(Retry retry) throws Exception {
-
-        FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        feed.setFeedPathValue(latePath);
-        feed.insertLateFeedValue(new Frequency("minutes(10)"));
-        bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
-        bundles[0].getDataSets().add(feed.toString());
-        bundles[0].setRetry(retry);
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-            List<DateTime> dates = null;
-
-            for (int i = 0; i < 10 && dates == null; ++i) {
-                dates = OozieUtil.getStartTimeForRunningCoordinators(cluster, bundleId);
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertNotNull(dates, String
-                .format("Start time for running coordinators of bundle: %s should not be null.",
-                    bundleId));
-            LOGGER.info("Start time: " + formatter.print(startDate));
-            LOGGER.info("End time: " + formatter.print(endDate));
-            LOGGER.info("candidate nominal time:" + formatter.print(dates.get(0)));
-
-            for (int attempt = 0;
-                 attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
-                "Failure Retry validation failed");
-
-            LOGGER.info("now suspending the process altogether....");
-
-            AssertUtil.assertSucceeded(
-                cluster.getProcessHelper().suspend(bundles[0].getProcessData()));
-
-            HashMap<String, Integer> initialMap = getFailureRetriesForEachWorkflow(
-                clusterOC, getDefaultOozieCoordinator(clusterOC, bundleId));
-            LOGGER.info("saved state of workflow retries");
-
-            for (String key : initialMap.keySet()) {
-                LOGGER.info(key + "," + initialMap.get(key));
-            }
-
-            TimeUnit.MINUTES.sleep(10);
-
-
-            HashMap<String, Integer> finalMap = getFailureRetriesForEachWorkflow(
-                clusterOC, getDefaultOozieCoordinator(clusterOC, bundleId));
-            LOGGER.info("final state of process looks like:");
-
-            for (String key : finalMap.keySet()) {
-                LOGGER.info(key + "," + finalMap.get(key));
-            }
-
-            Assert.assertEquals(initialMap.size(), finalMap.size(),
-                "a new workflow retried while process was suspended!!!!");
-
-            for (String key : initialMap.keySet()) {
-                Assert.assertEquals(initialMap.get(key), finalMap.get(key),
-                    "values are different for workflow: " + key);
-            }
-
-            LOGGER.info("now resuming the process...");
-            AssertUtil.assertSucceeded(
-                cluster.getProcessHelper().resume(bundles[0].getProcessData()));
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInLateDataCase(Retry retry) throws Exception {
-
-        FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        feed.setFeedPathValue(latePath);
-
-        feed.insertLateFeedValue(getFrequency(retry));
-
-        bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
-        bundles[0].getDataSets().add(feed.toString());
-
-        bundles[0].setRetry(retry);
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            List<String> initialData =
-                Util.getHadoopDataFromDir(clusterFS, bundles[0].getInputFeedFromBundle(),
-                    lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-            List<DateTime> dates = null;
-
-            for (int i = 0; i < 10 && dates == null; ++i) {
-                dates = OozieUtil.getStartTimeForRunningCoordinators(cluster, bundleId);
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertNotNull(dates, String
-                .format("Start time for running coordinators of bundle: %s should not be null.",
-                    bundleId));
-
-            LOGGER.info("Start time: " + formatter.print(startDate));
-            LOGGER.info("End time: " + formatter.print(endDate));
-            LOGGER.info("candidate nominal time:" + formatter.print(dates.get(0)));
-            DateTime now = dates.get(0);
-
-            if (formatter.print(startDate).compareToIgnoreCase(formatter.print(dates.get(0))) > 0) {
-                now = startDate;
-            }
-
-            //now wait till the process is over
-            for (int attempt = 0; attempt < 10 && !validateFailureRetries(
-                clusterOC, bundleId, bundles[0].getProcessObject().getRetry().getAttempts());
-                 ++attempt) {
-                TimeUtil.sleepSeconds(10);
-            }
-            Assert.assertTrue(
-                validateFailureRetries(clusterOC, bundleId,
-                    bundles[0].getProcessObject().getRetry().getAttempts()),
-                "Failure Retry validation failed");
-
-            String insertionFolder =
-                Util.findFolderBetweenGivenTimeStamps(now, now.plusMinutes(5), initialData);
-            LOGGER.info("inserting data in folder " + insertionFolder + " at " + DateTime.now());
-            HadoopUtil.injectMoreData(clusterFS, lateDir + insertionFolder,
-                    OSUtil.concat(OSUtil.OOZIE_EXAMPLE_INPUT_DATA, "lateData"));
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                bundles[0].getProcessObject().getRetry().getAttempts());
-
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
-    public void testRetryInDeleteAfterPartialRetryCase(Retry retry) throws Exception {
-
-        FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        feed.setFeedPathValue(latePath);
-        feed.insertLateFeedValue(new Frequency("minutes(1)"));
-        bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
-        bundles[0].getDataSets().add(feed.toString());
-
-        bundles[0].setRetry(retry);
-
-        for (String data : bundles[0].getDataSets()) {
-            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
-        }
-
-
-        //submit and schedule process
-        ServiceResponse response =
-            prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-        if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
-            AssertUtil.assertFailed(response);
-        } else {
-            AssertUtil.assertSucceeded(response);
-            HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
-            HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-            //now wait till the process is over
-            String bundleId = OozieUtil.getBundles(clusterOC,
-                bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
-            validateRetry(clusterOC, bundleId,
-                (bundles[0].getProcessObject().getRetry().getAttempts()) / 2);
-
-            AssertUtil.assertSucceeded(
-                prism.getProcessHelper().delete((bundles[0].getProcessData())));
-
-            if (retry.getPolicy() == PolicyType.EXP_BACKOFF) {
-                TimeUnit.MINUTES.sleep(retry.getDelay().getFrequencyAsInt() * ((retry.getAttempts()
-                    - (bundles[0].getProcessObject().getRetry().getAttempts()) / 2) ^ 2));
-            } else {
-                TimeUnit.MINUTES
-                    .sleep(retry.getDelay().getFrequencyAsInt()
-                        * ((bundles[0].getProcessObject().getRetry().getAttempts())
-                        - (bundles[0].getProcessObject().getRetry().getAttempts()) / 2));
-            }
-
-            //now to validate all failed instances to check if they were retried or not.
-            validateRetry(clusterOC, bundleId,
-                (bundles[0].getProcessObject().getRetry().getAttempts()) / 2);
-
-            if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
-                checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
-            }
-        }
-    }
-
-
-    private void validateRetry(OozieClient oozieClient, String bundleId, int maxNumberOfRetries)
-        throws Exception {
-        //validate that all failed processes were retried the specified number of times.
-        for (int i = 0; i < 60 && getDefaultOozieCoordinator(oozieClient, bundleId) == null; ++i) {
-            TimeUtil.sleepSeconds(10);
-        }
-        final CoordinatorJob defaultCoordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-        Assert.assertNotNull(defaultCoordinator, "Unexpected value of defaultCoordinator");
-
-        for (int i = 0;
-             i < 60 && !validateFailureRetries(oozieClient, bundleId, maxNumberOfRetries); ++i) {
-            LOGGER.info("desired state not reached, attempt number: " + i);
-            TimeUtil.sleepSeconds(10);
-        }
-        Assert.assertTrue(validateFailureRetries(oozieClient, bundleId, maxNumberOfRetries),
-            "all retries were not attempted correctly!");
-    }
-
-
-    private boolean validateFailureRetries(OozieClient oozieClient, String bundleId,
-                                           int maxNumberOfRetries) throws Exception {
-        final CoordinatorJob coordinator = getDefaultOozieCoordinator(clusterOC, bundleId);
-        if (maxNumberOfRetries < 0) {
-            maxNumberOfRetries = 0;
-        }
-        LOGGER.info("coordinator: " + coordinator);
-        HashMap<String, Boolean> workflowMap = new HashMap<>();
-
-        if (coordinator == null || coordinator.getActions().size() == 0) {
-            return false;
-        }
-        LOGGER.info("coordinator.getActions(): " + coordinator.getActions());
-        for (CoordinatorAction action : coordinator.getActions()) {
-
-            if (null == action.getExternalId()) {
-                return false;
-            }
-
-
-            WorkflowJob actionInfo = oozieClient.getJobInfo(action.getExternalId());
-            LOGGER
-                .info("actionInfo: " + actionInfo + " actionInfo.getRun(): " + actionInfo.getRun());
-
-
-            if (!(actionInfo.getStatus() == WorkflowJob.Status.SUCCEEDED
-                || actionInfo.getStatus() == WorkflowJob.Status.RUNNING)) {
-                if (actionInfo.getRun() == maxNumberOfRetries) {
-                    workflowMap.put(actionInfo.getId(), true);
-                } else {
-                    Assert.assertTrue(actionInfo.getRun() < maxNumberOfRetries,
-                        "The workflow exceeded the max number of retries specified for it!!!!");
-                    workflowMap.put(actionInfo.getId(), false);
-                }
-
-            } else if (actionInfo.getStatus() == WorkflowJob.Status.SUCCEEDED) {
-                workflowMap.put(actionInfo.getId(), true);
-            }
-        }
-
-        //first make sure that the map has all the entries for the coordinator:
-        if (workflowMap.size() != coordinator.getActions().size()) {
-            return false;
-        } else {
-            boolean result = true;
-
-            for (String key : workflowMap.keySet()) {
-                result &= workflowMap.get(key);
-            }
-
-            return result;
-        }
-    }
-
-    public CoordinatorJob getDefaultOozieCoordinator(OozieClient oozieClient, String bundleId)
-        throws Exception {
-        BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId);
-
-        for (CoordinatorJob coord : bundlejob.getCoordinators()) {
-            if (coord.getAppName().contains("DEFAULT")) {
-                return oozieClient.getCoordJobInfo(coord.getId());
-            }
-        }
-        return null;
-    }
-
-    @DataProvider(name = "DP")
-    public Object[][] getData() {
-
-        String[] retryTypes = new String[]{"periodic", "exp-backoff"}; //,"exp-backoff"
-        Integer[] delays = new Integer[]{2, 0}; //removing -1 since this should be checked at
-                                                // validation level while setting
-        String[] delayUnits = new String[]{"minutes"};
-        Integer[] retryAttempts = new Integer[]{2, 0, 3}; //0,-1,2
-
-        Object[][] crossProd = MatrixUtil
-            .crossProduct(delays, delayUnits, retryTypes, retryAttempts);
-        Object[][] testData = new Object[crossProd.length][1];
-        for (int i = 0; i < crossProd.length; ++i) {
-            final Integer delay = (Integer) crossProd[i][0];
-            final String delayUnit = (String) crossProd[i][1];
-            final String retryType = (String) crossProd[i][2];
-            final Integer retry = (Integer) crossProd[i][3];
-            testData[i][0] = getRetry(delay, delayUnit, retryType, retry);
-        }
-        return testData;
-    }
-
-    private void waitTillCertainPercentageOfProcessHasStarted(OozieClient oozieClient,
-                                                              String bundleId, int percentage)
-        throws Exception {
-        OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId);
-        CoordinatorJob defaultCoordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-
-        // make sure default coordinator is not null before we proceed
-        for (int i = 0; i < 120 && (defaultCoordinator == null || defaultCoordinator.getStatus()
-            == CoordinatorJob.Status.PREP); ++i) {
-            TimeUtil.sleepSeconds(10);
-            defaultCoordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-        }
-        Assert.assertNotNull(defaultCoordinator, "default coordinator is null");
-        Assert.assertNotEquals(defaultCoordinator.getStatus(), CoordinatorJob.Status.PREP,
-            "Unexpected state for coordinator job: " + defaultCoordinator.getId());
-        int totalCount = defaultCoordinator.getActions().size();
-
-        int percentageConversion = (percentage * totalCount) / 100;
-
-        while (percentageConversion > 0) {
-            int doneBynow = 0;
-            for (CoordinatorAction action : defaultCoordinator.getActions()) {
-                CoordinatorAction actionInfo = oozieClient.getCoordActionInfo(action.getId());
-                if (actionInfo.getStatus() == CoordinatorAction.Status.RUNNING) {
-                    doneBynow++;
-                }
-            }
-            if (doneBynow >= percentageConversion) {
-                break;
-            }
-            TimeUtil.sleepSeconds(10);
-        }
-    }
-
-
-    private HashMap<String, Integer> getFailureRetriesForEachWorkflow(OozieClient oozieClient,
-                                                                      CoordinatorJob coordinator)
-        throws Exception {
-        HashMap<String, Integer> workflowRetryMap = new HashMap<>();
-        for (CoordinatorAction action : coordinator.getActions()) {
-
-            if (null == action.getExternalId()) {
-                continue;
-            }
-
-            WorkflowJob actionInfo = oozieClient.getJobInfo(action.getExternalId());
-            LOGGER.info("adding workflow " + actionInfo.getId() + " to the map");
-            workflowRetryMap.put(actionInfo.getId(), actionInfo.getRun());
-        }
-        return workflowRetryMap;
-    }
-
-    private DateTime[] getFailureTimeBoundaries(OozieClient oozieClient, String bundleId)
-        throws Exception {
-        List<DateTime> dateList = new ArrayList<>();
-
-        CoordinatorJob coordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-
-        for (CoordinatorAction action : coordinator.getActions()) {
-            if (action.getExternalId() != null) {
-
-                WorkflowJob jobInfo = oozieClient.getJobInfo(action.getExternalId());
-                if (jobInfo.getRun() > 0) {
-                    dateList.add(new DateTime(jobInfo.getStartTime(), DateTimeZone.UTC));
-                }
-            }
-        }
-        Collections.sort(dateList);
-        return dateList.toArray(new DateTime[dateList.size()]);
-    }
-
-    private void checkIfRetriesWereTriggeredCorrectly(ColoHelper coloHelper, Retry retry,
-                                                      String bundleId) throws Exception {
-        //it is presumed that this delay here will be expressed in minutes. Hourly/daily is
-        // unfeasible to check :)
-
-        final DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("HH:mm:ss");
-
-        final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient();
-        final CoordinatorJob coordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-
-        for (CoordinatorAction action : coordinator.getActions()) {
-            CoordinatorAction coordAction = oozieClient.getCoordActionInfo(action.getExternalId());
-            if (!(coordAction.getStatus() == CoordinatorAction.Status.SUCCEEDED)) {
-                int expectedDelay = retry.getDelay().getFrequencyAsInt();
-                //first get data from logs:
-                List<String> instanceRetryTimes =
-                    Util.getInstanceRetryTimes(coloHelper, action.getExternalId());
-                List<String> instanceFinishTimes =
-                    Util.getInstanceFinishTimes(coloHelper, action.getExternalId());
-
-                LOGGER.info("finish times look like:");
-                for (String line : instanceFinishTimes) {
-                    LOGGER.info(line);
-                }
-
-                LOGGER.info("retry times look like:");
-                for (String line : instanceRetryTimes) {
-                    LOGGER.info(line);
-                }
-
-                LOGGER.info("checking timelines for retry type " + retry.getPolicy().value()
-                    + " for delay " + expectedDelay + " for workflow id: " + action.getExternalId());
-
-                if (retry.getPolicy() == PolicyType.PERIODIC) {
-                    //in this case the delay unit will always be a constant time diff
-                    for (int i = 0; i < instanceFinishTimes.size() - 1; i++) {
-                        DateTime temp = timeFormatter.parseDateTime(instanceFinishTimes.get(i));
-
-                        Assert.assertEquals(temp.plusMinutes(expectedDelay).getMillis(),
-                            timeFormatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(),
-                            5000, "oops! this is out of expected delay range for workflow id  "
-                                + action.getExternalId());
-                    }
-                } else {
-                    //check for exponential
-                    for (int i = 0; i < instanceFinishTimes.size() - 1; i++) {
-                        DateTime temp = timeFormatter.parseDateTime(instanceFinishTimes.get(i));
-                        Assert.assertEquals(temp.plusMinutes(expectedDelay).getMillis(),
-                            timeFormatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(),
-                            5000, "oops! this is out of expected delay range for workflow id "
-                                + action.getExternalId());
-                        expectedDelay *= 2;
-                    }
-                }
-            }
-        }
-
-    }
-
-    private Retry getRetry(int delay, String delayUnits, String retryType,
-                           int retryAttempts) {
-        Retry retry = new Retry() {
-            @Override
-            public String toString(){
-                return String.format("Frequency: %s; Attempts: %s; PolicyType: %s",
-                    this.getDelay(), this.getAttempts(), this.getPolicy());
-            }
-        };
-        retry.setAttempts(retryAttempts);
-        retry.setDelay(new Frequency(delayUnits + "(" + delay + ")"));
-        retry.setPolicy(PolicyType.fromValue(retryType));
-        return retry;
-    }
-
-    private Frequency getFrequency(Retry retry) {
-        int delay = retry.getDelay().getFrequencyAsInt();
-        if (delay == 0) {
-            delay = 1;
-        }
-        int attempts = retry.getAttempts();
-        if (attempts == 0) {
-            attempts = 1;
-        }
-
-        if (retry.getPolicy() == PolicyType.EXP_BACKOFF) {
-            delay = (Math.abs(delay)) * (2 ^ (Math.abs(attempts)));
-        } else {
-            delay = Math.abs(delay * attempts);
-        }
-
-        return new Frequency(retry.getDelay().getTimeUnit() + "(" + delay + ")");
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
deleted file mode 100644
index 0711e8a..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.supportClasses.JmsMessageConsumer;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-
-/**
- * Null output process tests.
- */
-@Test(groups = "embedded")
-public class NoOutputProcessTest extends BaseTestClass {
-
-    private ColoHelper cluster = servers.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-    private String testDir = cleanAndGetTestDir();
-    private String inputPath = testDir + "/input" + MINUTE_DATE_PATTERN;
-    private String workflowForNoIpOp = cleanAndGetTestDir();
-    private static final Logger LOGGER = Logger.getLogger(NoOutputProcessTest.class);
-
-    @BeforeClass(alwaysRun = true)
-    public void createTestData() throws Exception {
-        LOGGER.info("in @BeforeClass");
-        uploadDirToClusters(workflowForNoIpOp, OSUtil.concat(OSUtil.RESOURCES, "workflows", "aggregatorNoOutput"));
-        Bundle b = BundleUtil.readELBundle();
-        b.generateUniqueBundle(this);
-        b = new Bundle(b, cluster);
-        String startDate = "2010-01-03T00:00Z";
-        String endDate = "2010-01-03T03:00Z";
-        b.setInputFeedDataPath(inputPath);
-        String prefix = b.getFeedDataPathPrefix();
-        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0].generateUniqueBundle(this);
-        bundles[0] = new Bundle(bundles[0], cluster);
-        bundles[0].setProcessWorkflow(workflowForNoIpOp);
-        bundles[0].setInputFeedDataPath(inputPath);
-        bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
-        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
-        ProcessMerlin process = bundles[0].getProcessObject();
-        process.setOutputs(null);
-        process.setLateProcess(null);
-        bundles[0].submitFeedsScheduleProcess(prism);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    /**
-     * Waits till process ends successfully. Check that JMS messages related to entities
-     * reflects info about succeeded process instances as expected.
-     * @throws Exception
-     */
-    @Test(enabled = true, groups = {"singleCluster"})
-    public void checkForJMSMsgWhenNoOutput() throws Exception {
-        LOGGER.info("attaching messageConsumer to:   " + "FALCON.ENTITY.TOPIC");
-        JmsMessageConsumer messageConsumer =
-            new JmsMessageConsumer("FALCON.ENTITY.TOPIC", cluster.getClusterHelper().getActiveMQ());
-        messageConsumer.start();
-
-        //wait for all the instances to complete
-        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-        messageConsumer.interrupt();
-        Util.printMessageData(messageConsumer);
-        Assert.assertEquals(messageConsumer.getReceivedMessages().size(), 3,
-            " Message for all the 3 instance not found");
-    }
-
-    /**
-     * Waits till process ends successfully. Checks that JMS messages related to entities
-     * and to particular process reflects info about succeeded process instances as expected.
-     * @throws Exception
-     */
-    @Test(enabled = true, groups = {"singleCluster"})
-    public void rm() throws Exception {
-        JmsMessageConsumer consumerEntityMsg =
-            new JmsMessageConsumer("FALCON.ENTITY.TOPIC", cluster.getClusterHelper().getActiveMQ());
-        JmsMessageConsumer consumerProcessMsg =
-            new JmsMessageConsumer("FALCON." + bundles[0].getProcessName(),
-                cluster.getClusterHelper().getActiveMQ());
-        consumerEntityMsg.start();
-        consumerProcessMsg.start();
-
-        //wait for all the instances to complete
-        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-        consumerEntityMsg.interrupt();
-        consumerProcessMsg.interrupt();
-        Util.printMessageData(consumerEntityMsg);
-        Util.printMessageData(consumerProcessMsg);
-        Assert.assertEquals(consumerEntityMsg.getReceivedMessages().size(), 3,
-            " Message for all the 3 instance not found");
-        Assert.assertEquals(consumerProcessMsg.getReceivedMessages().size(), 3,
-            " Message for all the 3 instance not found");
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
deleted file mode 100644
index b0480e9..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Test process with different frequency combinations.
- */
-@Test(groups = "embedded")
-public class ProcessFrequencyTest extends BaseTestClass {
-    private static final Logger LOGGER = Logger.getLogger(ProcessFrequencyTest.class);
-    private ColoHelper cluster = servers.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-    private String baseTestHDFSDir = cleanAndGetTestDir();
-    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
-
-    @BeforeClass(alwaysRun = true)
-    public void createTestData() throws Exception {
-        LOGGER.info("in @BeforeClass");
-        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0] = new Bundle(bundles[0], cluster);
-        bundles[0].generateUniqueBundle(this);
-        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    /**
-     * Test Process submission with different frequency. Expecting process workflow to run
-     * successfully.
-     * @throws Exception
-     */
-    @Test(dataProvider = "generateProcessFrequencies")
-    public void testProcessWithFrequency(final FreqType freqType, final int freqAmount)
-        throws Exception {
-        final String startDate = "2010-01-02T01:00Z";
-        final String endDate = "2010-01-02T01:01Z";
-        final String inputPath = baseTestHDFSDir + "/input/";
-        bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
-        bundles[0].setOutputFeedLocationData(
-            baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
-        bundles[0].setProcessPeriodicity(freqAmount, freqType.getFalconTimeUnit());
-        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
-        bundles[0].setProcessValidity(startDate, endDate);
-        HadoopUtil.deleteDirIfExists(inputPath, clusterFS);
-        bundles[0].submitFeedsScheduleProcess(prism);
-
-        //upload data
-        final String startPath = inputPath + freqType.getFormatter().print(
-            TimeUtil.oozieDateToDate(startDate));
-        HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT);
-
-        final String processName = bundles[0].getProcessName();
-        //InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);
-        InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
-        InstanceUtil.validateSuccessWOInstances(r);
-    }
-
-    @DataProvider(name = "generateProcessFrequencies")
-    public Object[][] generateProcessFrequencies() {
-        return new Object[][] {
-            {FreqType.MINUTELY, 2, },
-            {FreqType.HOURLY, 3, },
-            {FreqType.DAILY, 5, },
-            {FreqType.MONTHLY, 7, },
-        };
-    }
-
-    /**
-     * Test Process submission with bad frequency. Expecting submissions to fails.
-     * @throws Exception
-     */
-    @Test
-    public void testProcessWithBadFrequency()
-        throws Exception {
-        final String startDate = "2010-01-02T01:00Z";
-        final String endDate = "2010-01-02T01:01Z";
-        final String inputPath = baseTestHDFSDir + "/input/";
-        final FreqType freqType = FreqType.MINUTELY;
-        bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
-        bundles[0].setOutputFeedLocationData(
-            baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
-        bundles[0].submitClusters(prism);
-        bundles[0].submitFeeds(prism);
-
-        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
-        bundles[0].setProcessValidity(startDate, endDate);
-        final ProcessMerlin processMerlin = bundles[0].getProcessObject();
-        //a frequency can be bad in two ways - it can have bad amount or it can have bad unit
-        //submit process with bad amount
-        processMerlin.setFrequency(new Frequency("BadAmount", freqType.getFalconTimeUnit()));
-        AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(processMerlin.toString()));
-
-        //submit process with bad unit
-        processMerlin.setFrequency(new Frequency("2993", freqType.getFalconTimeUnit()));
-        final String process = processMerlin.toString();
-        final String newProcess = process.replaceAll("minutes\\(2993\\)", "BadUnit(2993)");
-        AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(newProcess));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
deleted file mode 100644
index 91d39a7..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction.Status;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-/**
- * Process instance mixed colo tests.
- */
-@Test(groups = "embedded")
-public class ProcessInstanceColoMixedTest extends BaseTestClass {
-
-    private final String baseTestHDFSDir = cleanAndGetTestDir();
-    private final String feedPath = baseTestHDFSDir + "/feed0%d" + MINUTE_DATE_PATTERN;
-    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
-    private ColoHelper cluster1 = servers.get(0);
-    private ColoHelper cluster2 = servers.get(1);
-    private FileSystem cluster1FS = serverFS.get(0);
-    private FileSystem cluster2FS = serverFS.get(1);
-    private static final Logger LOGGER = Logger.getLogger(ProcessInstanceColoMixedTest.class);
-
-    @BeforeClass(alwaysRun = true)
-    public void prepareClusters() throws Exception {
-        LOGGER.info("in @BeforeClass");
-        HadoopUtil.uploadDir(cluster1FS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-        HadoopUtil.uploadDir(cluster2FS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        //generate bundles according to config files
-        bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1);
-        bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
-        bundles[0].generateUniqueBundle(this);
-        bundles[1].generateUniqueBundle(this);
-
-        //set cluster colos
-        bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
-        LOGGER.info("cluster b1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
-        bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
-        LOGGER.info("cluster b2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
-
-        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
-        bundles[1].setProcessWorkflow(aggregateWorkflowDir);
-        //submit 2 clusters
-        Bundle.submitCluster(bundles[0], bundles[1]);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    @Test(timeOut = 12000000)
-    public void mixed01C1sC2sC1eC2e() throws Exception {
-        //ua1 and ua3 are source. ua2 target.   feed01 on ua1 , feed02 on ua3
-        //get 2 unique feeds
-        FeedMerlin feed01 = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        FeedMerlin feed02 = new FeedMerlin(bundles[1].getInputFeedFromBundle());
-        FeedMerlin outputFeed = new FeedMerlin(bundles[0].getOutputFeedFromBundle());
-        //set source and target for the 2 feeds
-
-        //set clusters to null;
-        feed01.clearFeedClusters();
-        feed02.clearFeedClusters();
-        outputFeed.clearFeedClusters();
-
-        //set new feed input data
-        feed01.setFeedPathValue(String.format(feedPath, 1));
-        feed02.setFeedPathValue(String.format(feedPath, 2));
-
-        //generate data in both the colos ua1 and ua3
-        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
-            TimeUtil.getTimeWrtSystemTime(-35), TimeUtil.getTimeWrtSystemTime(25), 1);
-
-        String prefix = feed01.getFeedPrefix();
-        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
-        HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.SINGLE_FILE, prefix, dataDates);
-
-        prefix = feed02.getFeedPrefix();
-        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
-        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, prefix, dataDates);
-
-        String startTime = TimeUtil.getTimeWrtSystemTime(-70);
-
-        //set clusters for feed01
-        feed01.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(10000)", ActionType.DELETE)
-                .withValidity(startTime, "2099-01-01T00:00Z")
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-        feed01.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(10000)", ActionType.DELETE)
-                .withValidity(startTime, "2099-01-01T00:00Z")
-                .withClusterType(ClusterType.TARGET)
-                .build());
-
-        //set clusters for feed02
-        feed02.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(10000)", ActionType.DELETE)
-                .withValidity(startTime, "2099-01-01T00:00Z")
-                .withClusterType(ClusterType.TARGET)
-                .build());
-        feed02.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(10000)", ActionType.DELETE)
-                .withValidity(startTime, "2099-01-01T00:00Z")
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-
-        //set clusters for output feed
-        outputFeed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(10000)", ActionType.DELETE)
-                .withValidity(startTime, "2099-01-01T00:00Z")
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-        outputFeed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(10000)", ActionType.DELETE)
-                .withValidity(startTime, "2099-01-01T00:00Z")
-                .withClusterType(ClusterType.TARGET)
-                .build());
-
-        //submit and schedule feeds
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed02.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed.toString()));
-
-        String processStartTime = TimeUtil.getTimeWrtSystemTime(-16);
-        // String processEndTime = InstanceUtil.getTimeWrtSystemTime(20);
-
-        ProcessMerlin process = bundles[0].getProcessObject();
-        process.clearProcessCluster();
-        process.addProcessCluster(
-            new ProcessMerlin.ProcessClusterBuilder(
-                Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withValidity(processStartTime, TimeUtil.addMinsToTime(processStartTime, 35))
-                .build());
-        process.addProcessCluster(
-            new ProcessMerlin.ProcessClusterBuilder(
-                Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withValidity(TimeUtil.addMinsToTime(processStartTime, 16),
-                    TimeUtil.addMinsToTime(processStartTime, 45))
-                .build());
-        process.addInputFeed(feed02.getName(), feed02.getName());
-
-        //submit and schedule process
-        prism.getProcessHelper().submitAndSchedule(process.toString());
-
-        LOGGER.info("Wait till process goes into running ");
-        InstanceUtil.waitTillInstanceReachState(serverOC.get(0), process.getName(), 1,
-            Status.RUNNING, EntityType.PROCESS);
-        InstanceUtil.waitTillInstanceReachState(serverOC.get(1), process.getName(), 1,
-            Status.RUNNING, EntityType.PROCESS);
-
-        InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
-                "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
-        AssertUtil.assertSucceeded(responseInstance);
-        Assert.assertTrue(responseInstance.getInstances() != null);
-
-        responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(process.getName(),
-            "?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
-                + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
-        AssertUtil.assertSucceeded(responseInstance);
-        Assert.assertTrue(responseInstance.getInstances() != null);
-
-        responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
-            "?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
-                + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
-        AssertUtil.assertSucceeded(responseInstance);
-        Assert.assertTrue(responseInstance.getInstances() != null);
-
-        responseInstance = prism.getProcessHelper().getProcessInstanceResume(process.getName(),
-            "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
-        AssertUtil.assertSucceeded(responseInstance);
-        Assert.assertTrue(responseInstance.getInstances() != null);
-
-        responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
-            "?start=" + TimeUtil.addMinsToTime(processStartTime, 16)
-                + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
-        AssertUtil.assertSucceeded(responseInstance);
-        Assert.assertTrue(responseInstance.getInstances() != null);
-
-        responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(process.getName(),
-            "?start=" + processStartTime + "&end="+ TimeUtil.addMinsToTime(processStartTime, 7));
-        AssertUtil.assertSucceeded(responseInstance);
-        Assert.assertTrue(responseInstance.getInstances() != null);
-
-        responseInstance = prism.getProcessHelper().getProcessInstanceRerun(process.getName(),
-            "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
-        AssertUtil.assertSucceeded(responseInstance);
-        Assert.assertTrue(responseInstance.getInstances() != null);
-    }
-}
-