You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:06 UTC

[07/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
new file mode 100644
index 0000000..1fe768a
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
@@ -0,0 +1,1009 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismProcessDeleteTest extends BaseTestClass {
+
+    Bundle bundle;
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessDeleteTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismProcessDeleteTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        bundle = BundleUtil.readLateDataBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+	/* NOTE: All test cases assume that there are two entities scheduled in each colo
+        com.inmobi.qa.airavatqa.prism.PrismProcessDeleteTest
+        .testUA1ProcessDeleteAlreadyDeletedProcess */
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testUA1ProcessDeleteInBothColos() throws Exception {
+        //now submit the thing to prism
+        bundles[0].submitFeedsScheduleProcess();
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundle.getProcessData());
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //UA1:
+        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+        //UA2:
+        compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+        compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testUA1ProcessDeleteWhen1ColoIsDown() throws Exception {
+        try {
+            //now submit the thing to prism
+            bundles[0].submitFeedsScheduleProcess();
+            //fetch the initial store and archive state for prism
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+
+            //bring down UA2 colo :P
+            Util.shutDownService(cluster2.getClusterHelper());
+
+            //lets now delete the cluster from both colos
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+            //now lets get the final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            String clusterName = Util.readEntityName(bundle.getProcessData());
+            //prism:
+            compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+            compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+            //UA2:
+            compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+            compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA2ArchiveStore);
+
+            //bring service up
+            Util.startService(cluster2.getProcessHelper());
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+            HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
+
+            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+            compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+                clusterName);
+
+            compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
+            compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
+
+            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+            compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
+                clusterName);
+        } catch (Exception e) {
+            logger.info(e.getMessage());
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getClusterHelper());
+        }
+    }
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testUA1ProcessDeleteAlreadyDeletedProcess() throws Exception {
+        try {
+            //now submit the thing to prism
+            bundles[0].submitFeedsScheduleProcess();
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+            );
+            //fetch the initial store and archive state for prism
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+            );
+
+            //now lets get the final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            //prism:
+            compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+            compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+            //UA2:
+            compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+            compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+        } catch (Exception e) {
+            logger.info(e.getMessage());
+            throw new TestNGException(e.getMessage());
+        }
+    }
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testUA1ProcessDeleteTwiceWhen1ColoIsDownDuring1stDelete()
+        throws Exception {
+        try {
+            bundles[0].submitFeedsScheduleProcess();
+
+            Util.shutDownService(cluster2.getClusterHelper());
+
+            //lets now delete the cluster from both colos
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+            //now lets get the final states
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //start up service
+            Util.startService(cluster2.getClusterHelper());
+
+            //delete again
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+            //get final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            String clusterName = Util.readEntityName(bundle.getProcessData());
+            //prism:
+            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+            //UA2:
+            compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+            compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+
+            //UA1:
+            compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+            compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+        } catch (Exception e) {
+            logger.info(e.getMessage());
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getClusterHelper());
+        }
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testUA1ProcessDeleteNonExistent() throws Exception {
+        try {
+            //now lets get the final states
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //delete
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+            //get final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            //prism:
+            compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+            compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+            //UA2:
+            compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+            compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+        } catch (Exception e) {
+            logger.info(e.getMessage());
+            throw new TestNGException(e.getMessage());
+        }
+    }
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testUA1ProcessDeleteNonExistentWhen1ColoIsDownDuringDelete()
+        throws Exception {
+        try {
+            //now lets get the final states
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //bring down UA1
+            Util.shutDownService(cluster2.getClusterHelper());
+
+            //delete
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+            );
+
+            //get final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            //prism:
+            compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+            compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+            //UA2:
+            compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+            compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+            Util.startService(cluster2.getClusterHelper());
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+            );
+
+        } catch (Exception e) {
+            logger.info(e.getMessage());
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getClusterHelper());
+        }
+    }
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessScheduledInOneColo() throws Exception {
+        bundles[0].submitFeedsScheduleProcess();
+        bundles[1].submitFeedsScheduleProcess();
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundle.getProcessData());
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //UA1:
+        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+        //UA2:
+        compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+        compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessSuspendedInOneColo() throws Exception {
+        //create a UA1 bundle
+        bundles[0].submitFeedsScheduleProcess();
+        bundles[1].submitFeedsScheduleProcess();
+
+        //suspend UA1 colo thingy
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundles[0].getProcessData());
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //UA1:
+        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+        //UA2:
+        compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+        compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+    }
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessSuspendedInOneColoWhileBothProcessesAreSuspended()
+        throws Exception {
+        bundles[0].submitFeedsScheduleProcess();
+        bundles[1].submitFeedsScheduleProcess();
+
+        //suspend UA1 colo thingy
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+        //fetch the initial store and archive state for prism
+        List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the initial store and archive for both colos
+        List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //lets now delete the cluster from both colos
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+        //now lets get the final states
+        List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+        List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+        //fetch the final store and archive for both colos
+        List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+        List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+        List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+        List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+        //now ensure that data has been deleted from all cluster store and is present in the
+        // cluster archives
+
+        String clusterName = Util.readEntityName(bundle.getProcessData());
+        //prism:
+        compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+        compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+        //UA1:
+        compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+        compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+        //UA2:
+        compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+        compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessSuspendedInOneColoWhileThatColoIsDown()
+        throws Exception {
+        try {
+            bundles[0].submitFeedsScheduleProcess();
+            bundles[1].submitFeedsScheduleProcess();
+
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+            );
+
+            //fetch the initial store and archive state for prism
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //shutdown UA1
+            Util.shutDownService(cluster2.getFeedHelper());
+
+            //lets now delete the cluster from both colos
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+            //now lets get the final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            //prism:
+            compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+            compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+            //UA2:
+            compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+            compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getFeedHelper());
+        }
+    }
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessScheduledInOneColoWhileThatColoIsDown()
+        throws Exception {
+        try {
+            bundles[0].submitFeedsScheduleProcess();
+            bundles[1].submitFeedsScheduleProcess();
+
+            //fetch the initial store and archive state for prism
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //shutdown UA1
+            Util.shutDownService(cluster2.getFeedHelper());
+
+            //lets now delete the cluster from both colos
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+            //now lets get the final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            String clusterName = Util.readEntityName(bundles[0].getProcessData());
+            //prism:
+            compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+            compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+            //UA2:
+            compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+            compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+
+            Util.startService(cluster2.getClusterHelper());
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+            );
+
+            HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
+
+            compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
+            compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
+
+            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+            compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+                clusterName);
+
+            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+            compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
+                clusterName);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getFeedHelper());
+        }
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessSuspendedInOneColoWhileAnotherColoIsDown()
+        throws Exception {
+        try {
+            bundles[0].submitFeedsScheduleProcess();
+            bundles[1].submitFeedsScheduleProcess();
+
+            //now submit the thing to prism
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+            );
+            //fetch the initial store and archive state for prism
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //shutdown UA1
+            Util.shutDownService(cluster2.getFeedHelper());
+
+            //lets now delete the cluster from both colos
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+            );
+
+            //now lets get the final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            String clusterName = Util.readEntityName(bundle.getProcessData());
+            //prism:
+            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+            //UA2:
+            compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
+            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getFeedHelper());
+        }
+    }
+
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessSuspendedInOneColoWhileAnotherColoIsDownWithFeedSuspended()
+        throws Exception {
+        try {
+            bundles[0].submitFeedsScheduleProcess();
+            bundles[1].submitFeedsScheduleProcess();
+
+            //now submit the thing to prism
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+            );
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+            //fetch the initial store and archive state for prism
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //shutdown UA1
+            Util.shutDownService(cluster2.getFeedHelper());
+
+            //lets now delete the cluster from both colos
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+            );
+
+            //now lets get the final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            String clusterName = Util.readEntityName(bundle.getProcessData());
+            //prism:
+            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+            //UA2:
+            compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
+            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getFeedHelper());
+        }
+    }
+
+    @Test(groups = {"prism", "0.2"})
+    public void testDeleteProcessScheduledInOneColoWhileAnotherColoIsDown()
+        throws Exception {
+        try {
+            bundles[0].submitFeedsScheduleProcess();
+            bundles[1].submitFeedsScheduleProcess();
+
+            //fetch the initial store and archive state for prism
+            List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the initial store and archive for both colos
+            List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //shutdown UA1
+            Util.shutDownService(cluster2.getFeedHelper());
+
+            //lets now delete the cluster from both colos
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+            );
+
+            //now lets get the final states
+            List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+            List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+            //fetch the final store and archive for both colos
+            List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+            List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+            List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+            List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+            //now ensure that data has been deleted from all cluster store and is present in the
+            // cluster archives
+
+            String clusterName = Util.readEntityName(bundles[1].getProcessData());
+            //prism:
+            compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+            compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+            //UA1:
+            compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+            compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+            //UA2:
+            compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
+            compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+
+
+            Util.startService(cluster2.getClusterHelper());
+
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+            );
+
+            HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
+
+            clusterName = Util.readEntityName(bundles[0].getProcessData());
+
+            compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
+            compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
+
+            compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+            compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+                clusterName);
+
+            compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+            compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
+                clusterName);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getFeedHelper());
+        }
+    }
+
+
+    private void compareDataStoreStates(List<String> initialState, List<String> finalState,
+                                        String filename) {
+
+        List<String> temp = new ArrayList<String>(initialState);
+        temp.removeAll(finalState);
+        Assert.assertEquals(temp.size(), 1);
+        Assert.assertTrue(temp.get(0).contains(filename));
+
+    }
+
+
+    private void compareDataStoresForEquality(List<String> store1, List<String> store2) {
+        Assert.assertTrue(Arrays.deepEquals(store2.toArray(new String[store2.size()]),
+            store1.toArray(new String[store1.size()])));
+    }
+
+    public HashMap<String, List<String>> getSystemState(EntityType entityType) throws Exception {
+        IEntityManagerHelper prizm = prism.getClusterHelper();
+        IEntityManagerHelper ua1 = cluster2.getClusterHelper();
+        IEntityManagerHelper ua2 = cluster1.getClusterHelper();
+
+        if (entityType == EntityType.FEED) {
+            prizm = prism.getFeedHelper();
+            ua1 = cluster2.getFeedHelper();
+            ua2 = cluster1.getFeedHelper();
+        }
+
+        if (entityType == EntityType.PROCESS) {
+            prizm = prism.getProcessHelper();
+            ua1 = cluster2.getProcessHelper();
+            ua2 = cluster1.getProcessHelper();
+        }
+
+        HashMap<String, List<String>> temp = new HashMap<String, List<String>>();
+        temp.put("prismArchive", prizm.getArchiveInfo());
+        temp.put("prismStore", prizm.getStoreInfo());
+        temp.put("ua1Archive", ua1.getArchiveInfo());
+        temp.put("ua1Store", ua1.getStoreInfo());
+        temp.put("ua2Archive", ua2.getArchiveInfo());
+        temp.put("ua2Store", ua2.getStoreInfo());
+
+        return temp;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java
new file mode 100644
index 0000000..2e0fbaf
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+@Test(groups = "distributed")
+public class PrismProcessResumeTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessResumeTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismProcessResumeTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLateDataBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Schedule process. Suspend/resume it one by one. Check that process really suspended/resumed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeSuspendedFeedOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+        //suspend using prism
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //resume using prism
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //suspend using the colohelper
+        AssertUtil.assertSucceeded(
+            cluster2.getProcessHelper()
+                .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+        );
+        //verify
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //resume using colohelper
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //suspend on the other one
+        AssertUtil.assertSucceeded(
+            cluster1.getProcessHelper()
+                .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+        );
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+        //resume using colohelper
+        AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    /**
+     * Schedule processes, remove them. Try to resume them using colo-helpers and through prism.
+     * Attempt to -resume process which was removed should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeDeletedProcessOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+        //delete using prism
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+        //try to resume it through prism
+        AssertUtil.assertFailed(prism.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //delete using prism
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData()));
+
+        //try to resume it through prism
+        AssertUtil.assertFailed(prism.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+
+        //try to resume process through colohelper
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        //try to resume process through colohelper
+        AssertUtil.assertFailed(cluster1.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+    }
+
+    /**
+     * Schedule processes. One by one suspend them and then resume. Then try to resume them once
+     * more.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeResumedProcessOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+
+        for (int i = 0; i < 2; i++) {
+            //resume suspended process using prism
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+            //verify
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        }
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+        for (int i = 0; i < 2; i++) {
+            //resume resumed process
+            AssertUtil.assertSucceeded(
+                cluster2.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData())
+            );
+            //verify
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+        }
+
+        for (int i = 0; i < 2; i++) {
+            //resume on the other one
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+            );
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        }
+
+        for (int i = 0; i < 2; i++) {
+            //resume another resumed process
+            AssertUtil.assertSucceeded(
+                cluster1.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+            );
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        }
+    }
+
+    /**
+     * Attempt to resume non-existent process should fail through both prism and colohelpers.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testResumeNonExistentProcessOnBothColos() throws Exception {
+        AssertUtil.assertFailed(prism.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(prism.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(cluster1.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+    }
+
+    /**
+     * Attempt to resume process which wasn't submitted should fail.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testResumeSubmittedProcessOnBothColos() throws Exception {
+        bundles[0].submitProcess(true);
+        bundles[1].submitProcess(true);
+
+        AssertUtil.assertFailed(prism.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(prism.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(cluster1.getProcessHelper()
+            .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+
+    }
+
+    /**
+     * Schedule processes on both servers and then suspend them. Shutdown server. Check that it's
+     * impossible to resume process on this server and possible on another server.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeScheduledProcessOnBothColosWhen1ColoIsDown()
+        throws Exception {
+        try {
+            //schedule using colohelpers
+            bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+            bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+            AssertUtil.assertSucceeded(
+                cluster2.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+            );
+            AssertUtil.assertSucceeded(
+                cluster1.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+            );
+
+            Util.shutDownService(cluster2.getProcessHelper());
+
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+            //verify
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+            //resume on the other one
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+            AssertUtil
+                .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster2.getFeedHelper());
+        }
+
+    }
+
+    /**
+     * Schedule processes on both servers. Remove process form one of them. Shutdown server.
+     * Check that it's impossible to resume process on that server. Then remove another process
+     * from another server. Check the same.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeDeletedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+        try {
+            //schedule using colohelpers
+            bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+            bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+            //delete using coloHelpers
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+            );
+
+            Util.shutDownService(cluster2.getProcessHelper());
+
+            //try to resume using prism
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+            //verify
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+            //try to resume using colohelper
+            AssertUtil.assertFailed(
+                cluster2.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData())
+            );
+            //verify
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+            );
+            //suspend on the other one
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+
+            AssertUtil.assertFailed(
+                cluster1.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+            );
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster2.getFeedHelper());
+        }
+    }
+
+    /**
+     * Schedule processes on both servers. Suspend process on one server. Resume it. Shutdown
+     * this server. Try to resume that process once more. Attempt should fail. Then suspend
+     * process on another server. Resume it. Try to resume it once more. Should succeed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeResumedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+        try {
+            //schedule using colohelpers
+            bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+            bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+            //suspend using prism
+            AssertUtil.assertSucceeded(
+                cluster2.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+            );
+            //verify
+            AssertUtil
+                .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+            AssertUtil.assertSucceeded(
+                cluster2.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+            Util.shutDownService(cluster2.getProcessHelper());
+
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+
+
+            AssertUtil.assertSucceeded(
+                prism.getProcessHelper()
+                    .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+            for (int i = 0; i < 2; i++) {
+                //suspend on the other one
+                AssertUtil.assertSucceeded(
+                    prism.getProcessHelper()
+                        .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+                AssertUtil
+                    .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+                AssertUtil
+                    .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+            }
+
+            for (int i = 0; i < 2; i++) {
+                //suspend on the other one
+                AssertUtil.assertSucceeded(
+                    cluster1.getProcessHelper()
+                        .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+                AssertUtil
+                    .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+                AssertUtil
+                    .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster2.getProcessHelper());
+        }
+    }
+
+    /**
+     * Shutdown one of the server. Attempt to resume non-existent process on both servers should
+     * fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeNonExistentProcessOnBothColosWhen1ColoIsDown()
+        throws Exception {
+        try {
+            Util.shutDownService(cluster2.getProcessHelper());
+
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+            AssertUtil.assertFailed(
+                cluster1.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster2.getProcessHelper());
+        }
+    }
+
+    /**
+     * Submit processes on both servers. Shutdown one server. Attempt to resume non-scheduled
+     * process ob both servers should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2"})
+    public void testResumeSubmittedProcessOnBothColosWhen1ColoIsDown()
+        throws Exception {
+        try {
+            bundles[0].submitProcess(true);
+            bundles[1].submitProcess(true);
+
+            Util.shutDownService(cluster2.getProcessHelper());
+
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+            AssertUtil.assertFailed(
+                cluster1.getProcessHelper()
+                    .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+            );
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getCause());
+        } finally {
+            Util.restartService(cluster2.getProcessHelper());
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
new file mode 100644
index 0000000..319363c
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.supportClasses.HadoopFileEditor;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+
+public class PrismProcessScheduleTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessScheduleTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismProcessScheduleTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLateDataBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Schedules first process on colo-1. Schedule second process on colo-2. Check that first
+     * process hasn't been scheduled on colo-2.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testProcessScheduleOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleProcess();
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        bundles[1].submitAndScheduleProcess();
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+    }
+
+    /**
+     * Schedule first process on colo-1 and second one on colo-2. Then try to schedule them once
+     * more on the same colos. Check that request succeed and process status hasn't been changed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testScheduleAlreadyScheduledProcessOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleProcess();
+        bundles[1].submitAndScheduleProcess();
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+    }
+
+    /**
+     * Schedule two processes on two different colos. Suspend process on first colo.
+     * Try to schedule first process once more. Check that its status didn't change. Resume that
+     * process. Suspend process on colo-2. Check that process on colo-1 is running and process on
+     * colo-2 is suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testScheduleSuspendedProcessOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleProcess();
+        bundles[1].submitAndScheduleProcess();
+
+        //suspend process on colo-1
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //now check if it has been scheduled correctly or not
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .resume(URLS.RESUME_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+        //suspend process on colo-2
+        AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+        //now check if it has been scheduled correctly or not
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+  }
+
+    /**
+     * Schedule two processes on different colos. Delete both of them. Try to schedule them once
+     * more. Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testScheduleDeletedProcessOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleProcess();
+        bundles[1].submitAndScheduleProcess();
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(cluster1.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+    }
+
+    /**
+     * Attempt to schedule non-submitted process should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testScheduleNonExistentProcessOnBothColos() throws Exception {
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(cluster1.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+    }
+
+    /**
+     * Submit process which has colo-2 in it definition through prism. Shutdown falcon on colo-2.
+     * Submit and schedule the same process through prism. Check that mentioned process is running
+     * on colo-2.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testProcessScheduleOn1ColoWhileOtherColoIsDown() throws Exception {
+        try {
+            bundles[1].submitProcess(true);
+
+            Util.shutDownService(cluster2.getProcessHelper());
+
+            AssertUtil.assertSucceeded(prism.getProcessHelper()
+                .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+
+            //now check if they have been scheduled correctly or not
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+            //check if there is no criss cross
+            AssertUtil
+                .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getProcessHelper());
+        }
+    }
+
+    /**
+     * Submit process through prism. Shutdown a colo. Try to schedule process though prism.
+     * Process shouldn't be scheduled on that colo.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testProcessScheduleOn1ColoWhileThatColoIsDown() throws Exception {
+        try {
+            bundles[0].submitProcess(true);
+
+            Util.shutDownService(cluster2.getProcessHelper());
+
+            AssertUtil.assertFailed(prism.getProcessHelper()
+                .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+            AssertUtil
+                .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        } finally {
+            Util.restartService(cluster2.getProcessHelper());
+        }
+
+    }
+
+    /**
+     * Submit and schedule process. Suspend it. Submit and schedule another process on another
+     * colo. Check that first process is suspended and the second is running both on matching
+     * colos.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testProcessScheduleOn1ColoWhileAnotherColoHasSuspendedProcess()
+        throws Exception {
+        try {
+            bundles[0].submitAndScheduleProcess();
+            AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+                .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+
+            bundles[1].submitAndScheduleProcess();
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+            AssertUtil
+                .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil
+                .checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        }
+
+    }
+
+    /**
+     * Schedule process on one colo. Kill it. Schedule process on another colo. Check that
+     * processes were scheduled on appropriate colos and have expected statuses killed
+     * and running respectively.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testProcessScheduleOn1ColoWhileAnotherColoHasKilledProcess()
+        throws Exception {
+        try {
+            bundles[0].submitAndScheduleProcess();
+            AssertUtil.assertSucceeded(prism.getProcessHelper()
+                .delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+
+            bundles[1].submitAndScheduleProcess();
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+            AssertUtil
+                .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+            AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+            AssertUtil
+                .checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        }
+    }
+
+    /**
+     * Schedule process. Wait till it become killed. Remove it. Submit and schedule it again.
+     * Check that process was scheduled with new bundle associated to it.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000)
+    public void testRescheduleKilledProcess() throws Exception {
+
+    /*
+    add test data generator pending
+     */
+
+        bundles[0].setProcessValidity(TimeUtil.getTimeWrtSystemTime(-1),
+            TimeUtil.getTimeWrtSystemTime(1));
+        HadoopFileEditor hadoopFileEditor = null;
+        try {
+
+            hadoopFileEditor = new HadoopFileEditor(cluster1
+                .getClusterHelper().getHadoopFS());
+
+            hadoopFileEditor.edit(new ProcessMerlin(bundles[0]
+                    .getProcessData()).getWorkflow().getPath() + "/workflow.xml",
+                "<value>${outputData}</value>",
+                "<property>\n" +
+                    "                    <name>randomProp</name>\n" +
+                    "                    <value>randomValue</value>\n" +
+                    "                </property>");
+
+            bundles[0].submitFeedsScheduleProcess(prism);
+
+            InstanceUtil.waitForBundleToReachState(cluster1,
+                Util.readEntityName(bundles[0].getProcessData()),
+                org.apache.oozie.client.Job.Status.KILLED);
+
+            String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
+                Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+
+            prism.getProcessHelper().delete(URLS.DELETE_URL,
+                bundles[0].getProcessData());
+
+            bundles[0].submitAndScheduleProcess();
+
+            OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID,
+                new ArrayList<String>(),
+                bundles[0].getProcessData(), true,
+                false);
+        } finally {
+
+            if (hadoopFileEditor != null) {
+                hadoopFileEditor.restore();
+            }
+        }
+    }
+}