You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:09 UTC

[10/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
new file mode 100644
index 0000000..f4dfe78
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
@@ -0,0 +1,1128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismFeedDeleteTest extends BaseTestClass {
+
+    private boolean restartRequired;
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    private String cluster1Colo = cluster1.getClusterHelper().getColoName();
+    private String cluster2Colo = cluster2.getClusterHelper().getColoName();
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedDeleteTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismFeedDeleteTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        restartRequired = false;
+        Bundle bundle = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundle, cluster1);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+
+        bundles[1] = new Bundle(bundle, cluster2);
+        bundles[1].generateUniqueBundle();
+        bundles[1].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (restartRequired) {
+            Util.restartService(cluster1.getFeedHelper());
+        }
+        removeBundles();
+    }
+
+    /**
+     * NOTE: All test cases assume that there are two entities scheduled in each colo
+     */
+
+    @Test(groups = {"multiCluster"})
+    public void testServer1FeedDeleteInBothColos() throws Exception {
+        bundles[0].submitFeed();
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String feedName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, feedName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, feedName);
+
+        //server1:
+        compareDataStoreStates(initialServer1Store, finalServer1Store, feedName);
+        compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, feedName);
+
+        //server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void testServer1FeedDeleteWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+
+        bundles[0].submitFeed();
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+
+        //bring down Server2 colo :P
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+        Util.startService(cluster1.getFeedHelper());
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        List<String> Server2ArchivePostUp = cluster2.getFeedHelper().getArchiveInfo();
+        List<String> Server2StorePostUp = cluster2.getFeedHelper().getStoreInfo();
+
+        List<String> Server1ArchivePostUp = cluster1.getFeedHelper().getArchiveInfo();
+        List<String> Server1StorePostUp = cluster1.getFeedHelper().getStoreInfo();
+
+        List<String> prismHelperArchivePostUp = prism.getFeedHelper().getArchiveInfo();
+        List<String> prismHelperStorePostUp = prism.getFeedHelper().getStoreInfo();
+
+        compareDataStoreStates(finalPrismStore, prismHelperStorePostUp, clusterName);
+        compareDataStoreStates(prismHelperArchivePostUp, finalPrismArchiveStore, clusterName);
+
+        compareDataStoreStates(initialServer1Store, Server1StorePostUp, clusterName);
+        compareDataStoreStates(Server1ArchivePostUp, finalServer1ArchiveStore, clusterName);
+
+        compareDataStoresForEquality(finalServer2Store, Server2StorePostUp);
+        compareDataStoresForEquality(finalServer2ArchiveStore, Server2ArchivePostUp);
+    }
+
+
+    @Test(groups = {"multiCluster"})
+    public void testServer1FeedDeleteAlreadyDeletedFeed() throws Exception {
+        restartRequired = true;
+        bundles[0].submitFeed();
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(initialServer2ArchiveStore, finalServer2ArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+    }
+
+
+    @Test(groups = {"multiCluster"})
+    public void testServer1FeedDeleteTwiceWhen1ColoIsDownDuring1stDelete() throws Exception {
+        restartRequired = true;
+
+        bundles[0].submitFeed();
+
+        Util.shutDownService(cluster1.getClusterHelper());
+
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //start up service
+        Util.startService(cluster1.getFeedHelper());
+
+        //delete again
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //get final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(initialServer2ArchiveStore, finalServer2ArchiveStore);
+
+        //Server1:
+        compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+        compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void testServer1FeedDeleteNonExistent() throws Exception {
+        //now lets get the final states
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //get final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(initialServer2ArchiveStore, finalServer2ArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+    }
+
+
+    @Test(groups = {"multiCluster"})
+    public void testServer1FeedDeleteNonExistentWhen1ColoIsDownDuringDelete() throws Exception {
+        restartRequired = true;
+        bundles[0] = new Bundle(bundles[0], cluster1);
+        bundles[1] = new Bundle(bundles[1], cluster2);
+
+        bundles[0].setCLusterColo(cluster1Colo);
+        logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+        ServiceResponse r = prism.getClusterHelper()
+            .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        bundles[1].setCLusterColo(cluster2Colo);
+        logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        String startTimeServer1 = "2012-10-01T12:00Z";
+        String startTimeServer2 = "2012-10-01T12:00Z";
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+                "${cluster.colo}",
+                baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                baseHDFSDir +
+                    "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feed));
+    }
+
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedScheduledInOneColo() throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //Server1:
+        compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+        compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedSuspendedInOneColo() throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        //suspend Server1 colo thingy
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //Server1:
+        compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+        compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+
+    }
+
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedSuspendedInOneColoWhileBothFeedsAreSuspended() throws Exception {
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        //suspend Server1 colo thingy
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //Server1:
+        compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+        compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedSuspendedInOneColoWhileThatColoIsDown()
+        throws Exception {
+        restartRequired = true;
+
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //shutdown Server1
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+        Util.startService(cluster1.getClusterHelper());
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        List<String> Server1StorePostUp = cluster1.getFeedHelper().getStoreInfo();
+        List<String> Server1ArchivePostUp = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> Server2StorePostUp = cluster2.getFeedHelper().getStoreInfo();
+        List<String> Server2ArchivePostUp = cluster2.getFeedHelper().getArchiveInfo();
+
+        List<String> prismStorePostUp = prism.getFeedHelper().getStoreInfo();
+        List<String> prismArchivePostUp = prism.getFeedHelper().getArchiveInfo();
+
+
+        compareDataStoresForEquality(Server2StorePostUp, finalServer2Store);
+        compareDataStoresForEquality(Server2ArchivePostUp, finalServer2ArchiveStore);
+
+        compareDataStoreStates(finalServer1Store, Server1StorePostUp, clusterName);
+        compareDataStoreStates(Server1ArchivePostUp, finalServer1ArchiveStore, clusterName);
+
+        compareDataStoreStates(finalPrismStore, prismStorePostUp, clusterName);
+        compareDataStoreStates(prismArchivePostUp, finalPrismArchiveStore, clusterName);
+    }
+
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedSuspendedInOneColoWhileThatColoIsDownAndOtherHasSuspendedFeed()
+        throws Exception {
+        restartRequired = true;
+
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //shutdown Server1
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        //lets now delete the feed from both colos
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+        Util.startService(cluster1.getFeedHelper());
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        HashMap<String, List<String>> finalSystemState = getSystemState(EntityType.FEED);
+
+        compareDataStoreStates(finalSystemState.get("prismArchive"), finalPrismArchiveStore,
+            clusterName);
+        compareDataStoreStates(finalPrismStore, finalSystemState.get("prismStore"), clusterName);
+
+        compareDataStoreStates(finalServer1Store, finalSystemState.get("Server1Store"),
+            clusterName);
+        compareDataStoreStates(finalSystemState.get("Server1Archive"), finalServer1ArchiveStore,
+            clusterName);
+
+        compareDataStoresForEquality(finalSystemState.get("Server2Archive"),
+            finalServer2ArchiveStore);
+        compareDataStoresForEquality(finalSystemState.get("Server2Store"), finalServer2Store);
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedScheduledInOneColoWhileThatColoIsDown() throws Exception {
+        restartRequired = true;
+
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //shutdown Server1
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertFailed(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+        //Server2:
+        compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+        compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+
+        Util.startService(cluster1.getClusterHelper());
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        HashMap<String, List<String>> systemStatePostUp = getSystemState(EntityType.FEED);
+
+        compareDataStoreStates(finalPrismStore, systemStatePostUp.get("prismStore"), clusterName);
+        compareDataStoreStates(systemStatePostUp.get("prismArchive"), finalPrismArchiveStore,
+            clusterName);
+
+        compareDataStoreStates(finalServer1Store, systemStatePostUp.get("Server1Store"),
+            clusterName);
+        compareDataStoreStates(systemStatePostUp.get("Server1Archive"), finalServer1ArchiveStore,
+            clusterName);
+
+        compareDataStoresForEquality(finalServer2ArchiveStore,
+            systemStatePostUp.get("Server2Archive"));
+        compareDataStoresForEquality(finalServer2Store, systemStatePostUp.get("Server2Store"));
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedSuspendedInOneColoWhileAnotherColoIsDown() throws Exception {
+        restartRequired = true;
+
+        bundles[0].setCLusterColo(cluster1Colo);
+        logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        bundles[1].setCLusterColo(cluster2Colo);
+        logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        String startTimeServer1 = "2012-10-01T12:00Z";
+        String startTimeServer2 = "2012-10-01T12:00Z";
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+                "${cluster.colo}",
+                baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                baseHDFSDir +
+                    "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+
+        AssertUtil.assertSucceeded(r);
+
+        r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(r);
+        TimeUtil.sleepSeconds(15);
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        r = prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertPartial(r);
+        Assert
+            .assertTrue(r.getMessage().contains(cluster1Colo + "/org.apache.falcon.FalconException")
+                && r.getMessage().contains(cluster2Colo + "/" + Util.readEntityName(feed)));
+
+        ServiceResponse response = prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feed);
+        Assert.assertTrue(
+            response.getMessage().contains(cluster1Colo + "/org.apache.falcon.FalconException")
+                && response.getMessage().contains(cluster2Colo + "/" + Util.readEntityName(feed)));
+        AssertUtil.assertPartial(response);
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(finalServer1ArchiveStore, initialServer1ArchiveStore);
+
+        //Server2:
+        compareDataStoreStates(initialServer2Store, finalServer2Store, clusterName);
+        compareDataStoreStates(finalServer2ArchiveStore, initialServer2ArchiveStore, clusterName);
+    }
+
+    @Test(enabled = true)
+    public void testDeleteFeedSuspendedInOneColoWhileAnotherColoIsDownWithFeedSuspended()
+        throws Exception {
+        restartRequired = true;
+
+        bundles[0].setCLusterColo(cluster1Colo);
+        logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        bundles[1].setCLusterColo(cluster2Colo);
+        logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        String startTimeServer1 = "2012-10-01T12:00Z";
+        String startTimeServer2 = "2012-10-01T12:00Z";
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+            XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer1, "2099-10-01T12:10Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+                "${cluster.colo}",
+                baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        feed = InstanceUtil
+            .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer2, "2099-10-01T12:25Z"),
+                XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                baseHDFSDir + "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/$" +
+                    "{MINUTE}");
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+
+        AssertUtil.assertSucceeded(r);
+
+        r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+        AssertUtil.assertSucceeded(r);
+        TimeUtil.sleepSeconds(15);
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        r = prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed);
+        TimeUtil.sleepSeconds(10);
+        AssertUtil.assertSucceeded(r);
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        ServiceResponse response = prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feed);
+        Assert.assertTrue(response.getMessage().contains(cluster1Colo + "/org.apache.falcon" +
+            ".FalconException")
+            && response.getMessage().contains(cluster2Colo + "/" + Util.readEntityName(feed)));
+        AssertUtil.assertPartial(response);
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+        //prism:
+        compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+        compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(finalServer1ArchiveStore, initialServer1ArchiveStore);
+
+        //Server2:
+        compareDataStoreStates(initialServer2Store, finalServer2Store, clusterName);
+        compareDataStoreStates(finalServer2ArchiveStore, initialServer2ArchiveStore, clusterName);
+    }
+
+
+    @Test(groups = {"multiCluster"})
+    public void testDeleteFeedScheduledInOneColoWhileAnotherColoIsDown() throws Exception {
+        restartRequired = true;
+
+        bundles[0].submitAndScheduleFeed();
+        bundles[1].submitAndScheduleFeed();
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //shutdown Server1
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+        List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+        List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+        List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[1].getDataSets().get(0));
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //Server1:
+        compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+        compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+        //Server2:
+        compareDataStoreStates(initialServer2Store, finalServer2Store, clusterName);
+        compareDataStoreStates(finalServer2ArchiveStore, initialServer2ArchiveStore, clusterName);
+
+        Util.startService(cluster1.getFeedHelper());
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+        clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+
+        HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.FEED);
+
+        compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+            clusterName);
+        compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+
+        compareDataStoreStates(systemPostUp.get("Server1Archive"), finalServer1ArchiveStore,
+            clusterName);
+        compareDataStoreStates(finalServer1Store, systemPostUp.get("Server1Store"), clusterName);
+
+        compareDataStoresForEquality(finalServer2ArchiveStore, systemPostUp.get("Server2Archive"));
+        compareDataStoresForEquality(finalServer2Store, systemPostUp.get("Server2Store"));
+    }
+
+    private void compareDataStoreStates(List<String> initialState, List<String> finalState,
+                                        String filename) {
+        List<String> temp = new ArrayList<String>();
+        temp.addAll(initialState);
+        temp.removeAll(finalState);
+        Assert.assertEquals(temp.size(), 1);
+        Assert.assertTrue(temp.get(0).contains(filename));
+
+    }
+
+    private void compareDataStoresForEquality(List<String> store1, List<String> store2) {
+        Assert.assertEquals(store1.size(), store2.size(), "DataStores are not equal!");
+        Assert.assertTrue(Arrays.deepEquals(store2.toArray(new String[store2.size()]),
+            store1.toArray(new String[store1.size()])), "DataStores are not equal!");
+    }
+
+    public HashMap<String, List<String>> getSystemState(EntityType entityType) throws Exception {
+        IEntityManagerHelper prismHelper = prism.getClusterHelper();
+        IEntityManagerHelper server1Helper = cluster1.getClusterHelper();
+        IEntityManagerHelper server2Helper = cluster2.getClusterHelper();
+
+        if (entityType == EntityType.FEED) {
+            prismHelper = prism.getFeedHelper();
+            server1Helper = cluster1.getFeedHelper();
+            server2Helper = cluster2.getFeedHelper();
+        }
+
+        if (entityType == EntityType.PROCESS) {
+            prismHelper = prism.getProcessHelper();
+            server1Helper = cluster1.getProcessHelper();
+            server2Helper = cluster2.getProcessHelper();
+        }
+
+        HashMap<String, List<String>> temp = new HashMap<String, List<String>>();
+        temp.put("prismArchive", prismHelper.getArchiveInfo());
+        temp.put("prismStore", prismHelper.getStoreInfo());
+        temp.put("Server1Archive", server1Helper.getArchiveInfo());
+        temp.put("Server1Store", server1Helper.getStoreInfo());
+        temp.put("Server2Archive", server2Helper.getArchiveInfo());
+        temp.put("Server2Store", server2Helper.getStoreInfo());
+
+        return temp;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
new file mode 100644
index 0000000..77d07af
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismFeedLateReplicationTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    ColoHelper cluster3 = servers.get(2);
+    FileSystem cluster1FS = serverFS.get(0);
+    FileSystem cluster2FS = serverFS.get(1);
+    FileSystem cluster3FS = serverFS.get(2);
+    private String baseTestDir = baseHDFSDir + "/PrismFeedLateReplicationTest";
+    private String inputPath =
+        baseTestDir + "/input-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismFeedLateReplicationTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readELBundle();
+        for (int i = 0; i < 3; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void multipleSourceOneTarget_pastData() throws Exception {
+
+        bundles[0].setInputFeedDataPath(inputPath);
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        String postFix = "/US/" + cluster2.getClusterHelper().getColoName();
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+        HadoopUtil.lateDataReplenish(cluster2FS, 90, 1, prefix, postFix);
+
+
+        postFix = "/UK/" + cluster3.getClusterHelper().getColoName();
+        prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(-30);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "US/${cluster.colo}");
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+            null);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+        TimeUtil.sleepSeconds(10);
+
+        String bundleId =
+            InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+        //wait till 1st instance of replication coord is SUCCEEDED
+        List<String> replicationCoordIDTarget = InstanceUtil
+            .getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+
+        for (int i = 0; i < 30; i++) {
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+                0)
+                == WorkflowJob.Status.SUCCEEDED
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0)
+                == WorkflowJob.Status.SUCCEEDED) {
+                break;
+            }
+            TimeUtil.sleepSeconds(20);
+        }
+
+        TimeUtil.sleepSeconds(15);
+
+        List<String> inputFolderListForColo1 =
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+                replicationCoordIDTarget.get(0), 1);
+        List<String> inputFolderListForColo2 =
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+                replicationCoordIDTarget.get(1), 1);
+
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo2);
+    }
+
+    @Test(groups = {"multiCluster"})
+    public void multipleSourceOneTarget_futureData() throws Exception {
+
+        bundles[0].setInputFeedDataPath(inputPath);
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "US/${cluster.colo}");
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+            null);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "UK/${cluster.colo}");
+
+
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+        TimeUtil.sleepSeconds(10);
+
+        String postFix = "/US/" + cluster2.getClusterHelper().getColoName();
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+        HadoopUtil.lateDataReplenish(cluster2FS, 90, 1, prefix, postFix);
+
+        postFix = "/UK/" + cluster3.getClusterHelper().getColoName();
+        prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        TimeUtil.sleepSeconds(60);
+
+        //wait till 1st instance of replication coord is SUCCEEDED
+        String bundleId = InstanceUtil
+            .getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+        List<String> replicationCoordIDTarget = InstanceUtil.getReplicationCoordID(bundleId,
+            cluster1.getFeedHelper());
+
+        for (int i = 0; i < 30; i++) {
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+                0)
+                == WorkflowJob.Status.SUCCEEDED
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0)
+                == WorkflowJob.Status.SUCCEEDED) {
+                break;
+            }
+            logger.info("still in for loop");
+            TimeUtil.sleepSeconds(20);
+        }
+
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(0), 0),
+            WorkflowJob.Status.SUCCEEDED);
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0),
+            WorkflowJob.Status.SUCCEEDED);
+
+        TimeUtil.sleepSeconds(15);
+
+        List<String> inputFolderListForColo1 = InstanceUtil
+            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+                1);
+        List<String> inputFolderListForColo2 = InstanceUtil
+            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1),
+                1);
+
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo2);
+
+        //sleep till late starts
+        TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 4));
+
+        //check for run id to  be 1
+        Assert.assertEquals(
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+            1, "id has to be equal 1");
+        Assert.assertEquals(
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+            1, "id has to be equal 1");
+
+        //wait for lates run to complete
+        for (int i = 0; i < 30; i++) {
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+                0)
+                == WorkflowJob.Status.SUCCEEDED
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0)
+                == WorkflowJob.Status.SUCCEEDED) {
+                break;
+            }
+            logger.info("still in for loop");
+            TimeUtil.sleepSeconds(20);
+        }
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(0), 0),
+            WorkflowJob.Status.SUCCEEDED);
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0),
+            WorkflowJob.Status.SUCCEEDED);
+
+        TimeUtil.sleepSeconds(30);
+
+        //put data for the second time
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+            + "2ndLateData", inputFolderListForColo1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+            + "2ndLateData", inputFolderListForColo2);
+
+        //sleep till late 2 starts
+        TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9));
+
+        //check for run id to be 2
+        Assert.assertEquals(
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+            2, "id has to be equal 2");
+        Assert.assertEquals(
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+            2, "id has to be equal 2");
+    }
+
+    /**
+     * this test case does the following
+     * two source ua2 and ua3
+     * ua3 has following part data
+     * ua1/ua2
+     * ua1/ua2
+     * ua1/ua2
+     * <p/>
+     * ua2 has following part data
+     * ua1/ua3
+     * ua1/ua3
+     * ua1/ua3
+     * <p/>
+     * ua1 is the target, which in the end should have all ua1 data
+     * <p/>
+     * after first instance succeed data in put into relevant source and late should rerun
+     * <p/>
+     * after first late succeed data is put into other source and late should not
+     */
+
+    @Test(groups = {"multiCluster"})
+    public void mixedTest01() throws Exception {
+
+        bundles[0].setInputFeedDataPath(inputPath);
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "ua1/${cluster.colo}");
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+            null);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "ua1/${cluster.colo}");
+
+        //create data in colos
+
+        String postFix = "/ua1/ua2";
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+        HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua2/ua2";
+        HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua3/ua2";
+        HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+        //put _SUCCESS in parent folder UA2
+        HadoopUtil.putFileInFolderHDFS(cluster2FS, 90, 1, prefix, "_SUCCESS");
+
+        postFix = "/ua1/ua3";
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua2/ua3";
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua3/ua3";
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        //put _SUCCESS in parent folder of UA3
+        HadoopUtil.putFileInFolderHDFS(cluster3FS, 90, 1, prefix, "_SUCCESS");
+
+        //submit and schedule feed
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+        TimeUtil.sleepSeconds(10);
+
+        //wait till 1st instance of replication coord is SUCCEEDED
+        String bundleId =
+            InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+        List<String> replicationCoordIDTarget =
+            InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+
+        for (int i = 0; i < 30; i++) {
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+                0)
+                == WorkflowJob.Status.SUCCEEDED
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0)
+                == WorkflowJob.Status.SUCCEEDED) {
+                break;
+            }
+            logger.info("still in for loop");
+            TimeUtil.sleepSeconds(20);
+        }
+
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
+            "Replication job should have succeeded.");
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
+            "Replication job should have succeeded.");
+
+        TimeUtil.sleepSeconds(15);
+
+        //check for exact folders to be created in ua1 :  ua1/ua2 and ua1/ua3 no other should
+        // be present. both of them should have _success
+
+
+        List<String> inputFolderListForColo1 = InstanceUtil
+            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+                1);
+        List<String> inputFolderListForColo2 = InstanceUtil
+            .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1),
+                1);
+
+        String outPutLocation = InstanceUtil
+            .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+                0);
+        String outPutBaseLocation = InstanceUtil
+            .getOutputFolderBaseForInstanceForReplication(cluster1,
+                replicationCoordIDTarget.get(0), 0);
+
+        List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
+
+        Assert.assertEquals(subfolders.size(), 1);
+        Assert.assertEquals(subfolders.get(0), "ua1");
+
+        Assert.assertFalse(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutBaseLocation,
+            "_SUCCESS"));
+
+        Assert.assertTrue(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutLocation, "_SUCCESS"));
+
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo2);
+
+        //sleep till late starts
+        TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 4));
+
+        //check for run id to  be 1
+        Assert.assertTrue(
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) ==
+                1
+                && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+                0) == 1,
+            "id have to be equal 1");
+
+
+        //wait for lates run to complete
+        for (int i = 0; i < 30; i++) {
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+                0)
+                == WorkflowJob.Status.SUCCEEDED
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0)
+                == WorkflowJob.Status.SUCCEEDED) {
+                break;
+            }
+            logger.info("still in for loop");
+            TimeUtil.sleepSeconds(20);
+        }
+
+
+        TimeUtil.sleepSeconds(30);
+
+        //put data for the second time
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+            + "2ndLateData", inputFolderListForColo1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+            + "2ndLateData", inputFolderListForColo2);
+
+        //sleep till late 2 starts
+        TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9));
+
+        //check for run id to be 2
+        Assert.assertTrue(
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) ==
+                2
+                && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+                0) == 2,
+            "id have to be equal 2");
+    }
+
+    /**
+     * only difference between mixed 01 and 02 is of availability flag. feed has _success as
+     * availability flag ...so replication should not start till _success is put in ua2
+     * <p/>
+     * this test case does the following
+     * two source ua2 and ua3
+     * ua3 has follwing part data
+     * ua1/ua2
+     * ua1/ua2
+     * ua1/ua2
+     * <p/>
+     * ua2 has following part data
+     * ua1/ua3
+     * ua1/ua3
+     * ua1/ua3
+     * <p/>
+     * ua1 is the target, which in the end should have all ua1 data
+     * after first instance succeed data in put into relevant source and late should rerun
+     * after first late succeed data is put into other source and late should not rerun
+     */
+    @Test(groups = {"multiCluster"})
+    public void mixedTest02() throws Exception {
+        bundles[0].setInputFeedDataPath(inputPath);
+
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        //set availability flag as _success
+        bundles[0].setInputFeedAvailabilityFlag("_SUCCESS");
+
+        //get feed
+        String feed = bundles[0].getDataSets().get(0);
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+            ClusterType.SOURCE, null);
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(3);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+            "ua1/${cluster.colo}");
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+            null);
+
+        feed = InstanceUtil.setFeedCluster(feed,
+            XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+            XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+            "ua1/${cluster.colo}");
+
+        //create data in colos
+
+        String postFix = "/ua1/ua2";
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+        HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua2/ua2";
+        HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua3/ua2";
+        HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+        //put _SUCCESS in parent folder UA2
+        HadoopUtil.putFileInFolderHDFS(cluster2FS, 90, 1, prefix, "_SUCCESS");
+
+        postFix = "/ua1/ua3";
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua2/ua3";
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        postFix = "/ua3/ua3";
+        HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+        //put _SUCCESS in parent folder of UA3
+        HadoopUtil.putFileInFolderHDFS(cluster3FS, 90, 1, prefix, "_SUCCESS");
+
+        TimeUtil.sleepSeconds(15);
+
+        //submit and schedule feed
+        logger.info("feed: " + Util.prettyPrintXml(feed));
+
+        prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+        TimeUtil.sleepSeconds(10);
+
+        //wait till 1st instance of replication coord is SUCCEEDED
+        String bundleId =
+            InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+        List<String> replicationCoordIDTarget =
+            InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+
+        for (int i = 0; i < 30; i++) {
+            if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+                0)
+                == WorkflowJob.Status.SUCCEEDED
+                && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0)
+                == WorkflowJob.Status.SUCCEEDED) {
+                break;
+            }
+
+            logger.info("still in for loop");
+            TimeUtil.sleepSeconds(20);
+        }
+
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
+            "Replication job did not succeed");
+        Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+                replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
+            "Replication job did not succeed");
+
+        TimeUtil.sleepSeconds(15);
+
+        /* check for exact folders to be created in ua1 :  ua1/ua2 and ua1/ua3 no other should
+           be present. both of
+           them should have _success */
+        List<String> inputFolderListForColo1 =
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+                replicationCoordIDTarget.get(0), 1);
+        List<String> inputFolderListForColo2 =
+            InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+                replicationCoordIDTarget.get(1), 1);
+
+        String outPutLocation = InstanceUtil
+            .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+                0);
+        String outPutBaseLocation = InstanceUtil
+            .getOutputFolderBaseForInstanceForReplication(cluster1,
+                replicationCoordIDTarget.get(0), 0);
+
+        List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
+
+        Assert.assertEquals(subfolders.size(), 1);
+        Assert.assertEquals(subfolders.get(0), "ua1");
+
+        Assert.assertFalse(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutBaseLocation,
+            "_SUCCESS"));
+
+        Assert.assertTrue(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutLocation, "_SUCCESS"));
+
+        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo1);
+        HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+            inputFolderListForColo2);
+
+        //sleep till late starts
+        TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 4));
+
+        //check for run id to  be 1
+        Assert.assertTrue(
+            InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) ==
+                1
+                && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+                0) == 1,
+            "id have to be equal 1");
+    }
+}