You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:05 UTC

[06/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
new file mode 100644
index 0000000..efa5b8f
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismProcessSnSTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSnSTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismProcessSnSTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readLateDataBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testProcessSnSOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleProcess();
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        bundles[1].submitAndScheduleProcess();
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        //check if there is no criss cross
+        ServiceResponse response =
+            prism.getProcessHelper()
+                .getStatus(URLS.STATUS_URL, bundles[1].getProcessData());
+        logger.info(response.getMessage());
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testProcessSnSForSubmittedProcessOnBothColos() throws Exception {
+        //schedule both bundles
+
+        bundles[0].submitProcess(true);
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        bundles[1].submitProcess(true);
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testProcessSnSForSubmittedProcessOnBothColosUsingColoHelper()
+        throws Exception {
+        //schedule both bundles
+
+        bundles[0].submitProcess(true);
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        bundles[1].submitProcess(true);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        bundles[1].submitProcess(true);
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testProcessSnSAlreadyScheduledOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleProcess();
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        bundles[1].submitAndScheduleProcess();
+
+        //now check if they have been scheduled correctly or not
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        //check if there is no criss cross
+        AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+        //reschedule trial
+
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+        Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
+            Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+        AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSnSSuspendedProcessOnBothColos() throws Exception {
+        //schedule both bundles
+        bundles[0].submitAndScheduleProcess();
+        bundles[1].submitAndScheduleProcess();
+
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        //now check if they have been scheduled correctly or not
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
+            Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+        AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+            .resume(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+        AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+            .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+
+        Assert.assertEquals(OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
+            Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS).size(), 1);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+    }
+
+    @Test(groups = {"prism", "0.2", "embedded"})
+    public void testSnSDeletedProcessOnBothColos() throws Exception {
+        //schedule both bundles
+        final String cluster1Running = cluster1.getClusterHelper().getColoName() + "/RUNNING";
+        final String cluster2Running = cluster2.getClusterHelper().getColoName() + "/RUNNING";
+        bundles[0].submitAndScheduleProcess();
+
+        Assert.assertEquals(Util.parseResponse(
+                prism.getProcessHelper()
+                    .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())).getMessage(),
+            cluster1Running
+        );
+
+        bundles[1].submitAndScheduleProcess();
+        Assert.assertEquals(Util.parseResponse(
+                prism.getProcessHelper()
+                    .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())).getMessage(),
+            cluster2Running
+        );
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+
+        Assert.assertEquals(Util.parseResponse(
+                prism.getProcessHelper()
+                    .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())
+            ).getMessage(),
+            cluster1Running
+        );
+        Assert.assertEquals(Util.parseResponse(
+                prism.getProcessHelper()
+                    .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())
+            ).getMessage(),
+            cluster2Running
+        );
+
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testScheduleNonExistentProcessOnBothColos() throws Exception {
+        Assert.assertEquals(Util.parseResponse(cluster2.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()))
+            .getStatusCode(), 404);
+        Assert.assertEquals(Util.parseResponse(cluster1.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()))
+            .getStatusCode(), 404);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java
new file mode 100644
index 0000000..e77ae13
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismProcessSuspendTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster1OC = serverOC.get(0);
+    OozieClient cluster2OC = serverOC.get(1);
+    private boolean restartRequired;
+    String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSuspendTest/aggregator";
+    private static final Logger logger = Logger.getLogger(PrismProcessSuspendTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        restartRequired = false;
+        Bundle bundle = BundleUtil.readLateDataBundle();
+        for (int i = 0; i < 2; i++) {
+            bundles[i] = new Bundle(bundle, servers.get(i));
+            bundles[i].generateUniqueBundle();
+            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+        }
+
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (restartRequired) {
+            Util.restartService(cluster1.getProcessHelper());
+        }
+        removeBundles();
+    }
+
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendSuspendedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+
+        //suspend using prismHelper
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        Util.shutDownService(cluster1.getProcessHelper());
+
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+        for (int i = 0; i < 2; i++) {
+            //suspend on the other one
+            AssertUtil.assertSucceeded(prism.getProcessHelper()
+                .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil
+                .checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+        }
+    }
+
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendScheduledProcessOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+        //suspend using prismHelper
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //suspend on the other one
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        Assert.assertTrue(Util.parseResponse(prism.getProcessHelper()
+            .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())).getMessage()
+            .contains("SUSPENDED"));
+        Assert.assertTrue(Util.parseResponse(prism.getProcessHelper()
+            .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())).getMessage()
+            .contains("RUNNING"));
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendDeletedProcessOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+        //delete using coloHelpers
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+
+        //suspend using prismHelper
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[1].getProcessData()));
+        //suspend on the other one
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendSuspendedProcessOnBothColos() throws Exception {
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+
+        for (int i = 0; i < 2; i++) {
+            //suspend using prismHelper
+            AssertUtil.assertSucceeded(prism.getProcessHelper()
+                .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+            //verify
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+        }
+
+
+        for (int i = 0; i < 2; i++) {
+            //suspend on the other one
+            AssertUtil.assertSucceeded(prism.getProcessHelper()
+                .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+            AssertUtil
+                .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+            AssertUtil
+                .checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+        }
+    }
+
+    @Test(groups = "embedded")
+    public void testSuspendNonExistentProcessOnBothColos() throws Exception {
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+        AssertUtil.assertFailed(cluster1.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+    }
+
+    @Test(groups = "embedded")
+    public void testSuspendSubmittedProcessOnBothColos() throws Exception {
+        bundles[0].submitProcess(true);
+        bundles[1].submitProcess(true);
+
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+        AssertUtil.assertFailed(cluster1.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendScheduledProcessOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+        //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+        Util.shutDownService(cluster1.getProcessHelper());
+
+        //suspend using prismHelper
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        //suspend on the other one
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void testSuspendDeletedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;         //schedule using colohelpers
+        bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+        bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+        //delete using coloHelpers
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+        Util.shutDownService(cluster1.getProcessHelper());
+
+        //suspend using prismHelper
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        //verify
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[1].getProcessData()));
+        //suspend on the other one
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+    }
+
+
+    @Test(groups = "distributed")
+    public void testSuspendNonExistentProcessOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+        Util.shutDownService(cluster1.getProcessHelper());
+
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+    }
+
+    @Test(groups = "distributed")
+    public void testSuspendSubmittedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+        restartRequired = true;
+        bundles[0].submitProcess(true);
+        bundles[1].submitProcess(true);
+
+        Util.shutDownService(cluster1.getProcessHelper());
+
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(
+            prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+        AssertUtil.assertFailed(cluster2.getProcessHelper()
+            .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
new file mode 100644
index 0000000..8156937
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
@@ -0,0 +1,602 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.ConnectException;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismSubmitTest extends BaseTestClass {
+
+    ColoHelper cluster1 = servers.get(0);
+    ColoHelper cluster2 = servers.get(1);
+    String baseTestDir = baseHDFSDir + "/PrismSubmitTest";
+    String randomHDFSPath = baseTestDir + "/someRandomPath";
+    String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    boolean restartRequired;
+    private static final Logger logger = Logger.getLogger(PrismSubmitTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        restartRequired = false;
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster1);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (restartRequired) {
+            Util.startService(prism.getFeedHelper());
+            Util.startService(cluster1.getFeedHelper());
+        }
+        removeBundles();
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_1prism1coloPrismdown() throws Exception {
+        restartRequired = true;
+        Util.shutDownService(prism.getClusterHelper());
+
+        List<String> beforeSubmit = cluster1.getClusterHelper().getStoreInfo();
+        try {
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        } catch (ConnectException e) {
+            Assert.assertTrue(e.getMessage().contains("Connection to "
+                + prism.getClusterHelper().getHostname() + " refused"), e.getMessage());
+        }
+        List<String> afterSubmit = cluster1.getClusterHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(beforeSubmit, afterSubmit, 0);
+
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_resubmitDiffContent() throws Exception {
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+
+        bundles[0].setCLusterWorkingPath(bundles[0].getClusters().get(0), randomHDFSPath);
+        logger.info("modified cluster Data: "
+            + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_resubmitAlreadyPARTIALWithAllUp() throws Exception {
+        restartRequired = true;
+        Util.shutDownService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+        Util.startService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+    }
+
+    @Test(groups = "distributed")
+    public void submitProcess_1ColoDownAfter2FeedSubmitStartAfterProcessSubmitAnsDeleteProcess()
+        throws Exception {
+        restartRequired = true;
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(1));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        Util.shutDownService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(12);
+
+
+        List<String> beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        List<String> beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+
+        AssertUtil.assertSucceeded(
+            prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData()));
+        AssertUtil.assertFailed(r);
+        List<String> afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        List<String> afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        List<String> afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.getProcessName(bundles[0].getProcessData()), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+        Util.startService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(15);
+
+        beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        r = prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.getProcessName(bundles[0].getProcessData()), -1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+    }
+
+    @Test(groups = "distributed")
+    public void submitProcess_ideal() throws Exception {
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        List<String> beforeSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+        List<String> beforeSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(1));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        List<String> afterSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+        List<String> afterSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+        List<String> afterSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 2);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 2);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+        beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        r = prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
+            Util.getProcessName(bundles[0].getProcessData()), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.getProcessName(bundles[0].getProcessData()), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_1prism1coloColoDown() throws Exception {
+        restartRequired = true;
+        Util.shutDownService(cluster1.getClusterHelper());
+
+        List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+
+        List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+
+        Util.startService(cluster1.getClusterHelper());
+
+        TimeUtil.sleepSeconds(10);
+
+        beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        //should be succeeded
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_1prism1coloSubmitDeleted() throws Exception {
+        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+
+        List<String> beforeSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        List<String> afterSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+    }
+
+    @Test(groups = "embedded")
+    public void submitProcess_woClusterSubmit() throws Exception {
+        ServiceResponse r =
+            prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+
+        Assert.assertTrue(r.getMessage().contains("FAILED"));
+        Assert.assertTrue(r.getMessage().contains("is not registered"));
+    }
+
+    @Test(groups = "embedded")
+    public void submitProcess_woFeedSubmit() throws Exception {
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        r = prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+        Assert.assertTrue(r.getMessage().contains("FAILED"));
+        Assert.assertTrue(r.getMessage().contains("is not registered"));
+    }
+
+    @Test(groups = {"prism", "0.2", "distributed"})
+    public void submitCluster_resubmitAlreadyPARTIAL() throws Exception {
+        restartRequired = true;
+        bundles[1] = new Bundle(bundles[0], cluster2);
+        bundles[1].generateUniqueBundle();
+        bundles[1].setProcessWorkflow(aggregateWorkflowDir);
+
+        List<String> beforeCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforePrism = prism.getClusterHelper().getStoreInfo();
+        List<String> beforeCluster2 = cluster2.getClusterHelper().getStoreInfo();
+
+        Util.shutDownService(cluster1.getFeedHelper());
+
+        bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
+        logger.info("cluster b2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+        List<String> parCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> parPrism = prism.getClusterHelper().getStoreInfo();
+        List<String> parCluster2 = cluster2.getClusterHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(parCluster1, beforeCluster1, 0);
+        AssertUtil.compareDataStoreStates(beforePrism, parPrism,
+            Util.readEntityName(bundles[1].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeCluster2, parCluster2,
+            Util.readEntityName(bundles[1].getClusters().get(0)), 1);
+
+        Util.restartService(cluster1.getFeedHelper());
+
+        bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
+        logger.info("cluster b1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        List<String> afterCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterPrism = prism.getClusterHelper().getStoreInfo();
+        List<String> afterCluster2 = cluster2.getClusterHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(parCluster1, afterCluster1,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(afterPrism, parPrism, 0);
+        AssertUtil.compareDataStoreStates(afterCluster2, parCluster2, 0);
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_polarization() throws Exception {
+        restartRequired = true;
+        //shutdown one colo and submit
+        Util.shutDownService(cluster1.getClusterHelper());
+        List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+
+        //resubmit PARTIAL success
+        Util.startService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+        beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_resubmitDiffContentPARTIAL() throws Exception {
+        restartRequired = true;
+        Util.shutDownService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+        Util.startService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+
+        List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        bundles[0].setCLusterWorkingPath(bundles[0].getClusters().get(0), randomHDFSPath);
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+    }
+
+    @Test
+    public void submitCluster_PARTIALDeletedOfPARTIALSubmit() throws Exception {
+        restartRequired = true;
+        Util.shutDownService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+        List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        r = prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+        List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.readEntityName(bundles[0].getClusters().get(0)), -1);
+
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_submitPartialDeleted() throws Exception {
+        restartRequired = true;
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        TimeUtil.sleepSeconds(30);
+
+        Util.shutDownService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+
+        List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        r = prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+        List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.readEntityName(bundles[0].getClusters().get(0)), -1);
+
+        Util.startService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+
+        beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+    }
+
+    @Test(groups = "embedded")
+    public void submitCluster_resubmitAlreadySucceeded() throws Exception {
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+    }
+
+    @Test(groups = "distributed")
+    public void submitCluster_1prism1coloAllUp() throws Exception {
+        List<String> beforeSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        List<String> afterSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2,
+            Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+    }
+
+    @Test(groups = "embedded")
+    public void submitCluster_1prism1coloAlreadySubmitted() throws Exception {
+        prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        List<String> beforeSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+        List<String> afterSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+        List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+    }
+
+    @Test
+    public void submitProcess_1ColoDownAfter1FeedSubmitStartAfter2feed() throws Exception {
+        restartRequired = true;
+        ServiceResponse r =
+            prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"), r.getMessage());
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(0));
+        Assert.assertTrue(r.getMessage().contains("SUCCEEDED"), r.getMessage());
+
+        Util.shutDownService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(30);
+
+        List<String> beforeSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+        List<String> beforeSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+        List<String> beforeSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+        r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(1));
+        Assert.assertTrue(r.getMessage().contains("FAILED"));
+
+        List<String> afterSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+        List<String> afterSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+        List<String> afterSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.readEntityName(bundles[0].getDataSets().get(1)), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+        Util.startService(cluster1.getClusterHelper());
+        TimeUtil.sleepSeconds(15);
+
+        beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        r = prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+        Assert.assertTrue(r.getMessage().contains("FAILED"), r.getMessage());
+
+        afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+        afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+        afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+        AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+            Util.getProcessName(bundles[0].getProcessData()), 1);
+        AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+    }
+
+    @DataProvider(name = "errorDP")
+    public Object[][] getTestData(Method m) {
+        Object[][] testData = new Object[2][1];
+        testData[0][0] = "EmptyInputTagProcess";
+        testData[1][0] = "EmptyOutputTagProcess";
+
+        return testData;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
new file mode 100644
index 0000000..5c3cf1a
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+
+@Test(groups = "embedded")
+public class ProcessPartitionExpVariableTest extends BaseTestClass {
+    private static final Logger logger = Logger.getLogger(ProcessPartitionExpVariableTest.class);
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    OozieClient clusterOC = serverOC.get(0);
+    private String baseTestDir = baseHDFSDir + "/ProcessPartitionExpVariableTest";
+    String aggregateWorkflowDir = baseTestDir + "/aggregator";
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        removeBundles();
+        HadoopUtil.deleteDirIfExists(baseTestDir, clusterFS);
+    }
+
+    /**
+     * Test case: set 1 optional and 1 compulsory input for process. Set partitions for each
+     * input as expression language variable linked with process properties. Check that process
+     * runs fine with partition provided for compulsory input as exp variable and succeeds in
+     * spite of nonexistent partition provided for optional input.
+     *
+     * @throws Exception
+     */
+    @Test(enabled = true)
+    public void ProcessPartitionExpVariableTest_OptionalCompulsoryPartition() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(-4);
+        String endTime = TimeUtil.getTimeWrtSystemTime(30);
+
+        bundles[0].generateRequiredBundle(1, 2, 1, baseTestDir, 1, startTime, endTime);
+        bundles[0].setProcessInputNames("inputData0", "inputData");
+        Property p = new Property();
+        p.setName("var1");
+        p.setValue("hardCoded");
+
+        bundles[0].addProcessProperty(p);
+        bundles[0].setProcessInputPartition("${var1}", "${fileTime}");
+
+        for (int i = 0; i < bundles[0].getDataSets().size(); i++)
+            logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
+
+        logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitAndScheduleBundle(prism, false);
+
+        List<String> dataDates = generateDateAndOneDayAfter(
+                TimeUtil.oozieDateToDate(TimeUtil.addMinsToTime(startTime, -25)),
+                TimeUtil.oozieDateToDate(TimeUtil.addMinsToTime(endTime, 25)), 5);
+
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, baseTestDir
+            + "/input1/", dataDates);
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC,
+            Util.getProcessName(bundles[0].getProcessData()), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+    }
+
+
+    /**
+     * Generates patterns of the form .../2014/03/06/21/57/2014-Mar-07 between two supplied dates.
+     * There are two dates and the second date is one day after the first one
+     *
+     * @param startDate  start date
+     * @param endDate    end date
+     * @param minuteSkip interval with which directories are created
+     * @return list of such dates
+     */
+    private static List<String> generateDateAndOneDayAfter(DateTime startDate, DateTime endDate,
+                                                           int minuteSkip) {
+        final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm/");
+        final DateTimeFormatter formatter2 = DateTimeFormat.forPattern("yyyy-MMM-dd");
+        logger.info("generating data between " + formatter.print(startDate) + " and " +
+            formatter.print(endDate));
+
+        List<String> dates = new ArrayList<String>();
+        while (!startDate.isAfter(endDate)) {
+            final DateTime nextDate = startDate.plusMinutes(minuteSkip);
+            dates.add(formatter.print(nextDate) + formatter2.print(nextDate.plusDays(1)));
+            if (minuteSkip == 0) {
+                minuteSkip = 1;
+            }
+            startDate = nextDate;
+        }
+        return dates;
+    }
+
+    //TODO: ProcessPartitionExpVariableTest_OptionalPartition()
+    //TODO: ProcessPartitionExpVariableTest_CompulsoryPartition()
+    //TODO: ProcessPartitionExpVariableTest_moreThanOnceVariable()
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
new file mode 100644
index 0000000..0e2d8eb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+@Test(groups = "embedded")
+public class RescheduleKilledProcessTest extends BaseTestClass {
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    String aggregateWorkflowDir = baseHDFSDir + "/RescheduleKilledProcessTest/aggregator";
+    private static final Logger logger = Logger.getLogger(RescheduleKilledProcessTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     *  Run process and delete it. Submit and schedule once more.
+     *
+     * @throws Exception
+     */
+    @Test(enabled = false, timeOut = 1200000)
+    public void rescheduleKilledProcess() throws Exception {
+        String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
+        String processEndTime = TimeUtil.getTimeWrtSystemTime(6);
+        String process = bundles[0].getProcessData();
+        process = InstanceUtil.setProcessName(process, "zeroInputProcess" + new Random().nextInt());
+        List<String> feed = new ArrayList<String>();
+        feed.add(bundles[0].getOutputFeedFromBundle());
+        final ProcessMerlin processMerlin = new ProcessMerlin(process);
+        processMerlin.setProcessFeeds(feed, 0, 0, 1);
+        process = processMerlin.toString();
+
+        process = InstanceUtil.setProcessCluster(process, null,
+            XmlUtil.createProcessValidity(processStartTime, "2099-01-01T00:00Z"));
+        process = InstanceUtil
+            .setProcessCluster(process, Util.readEntityName(bundles[0].getClusters().get(0)),
+                XmlUtil.createProcessValidity(processStartTime, processEndTime));
+        bundles[0].setProcessData(process);
+
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL,
+            bundles[0].getProcessData()));
+    }
+
+    /**
+     * Submit and schedule a process. Then remove it. Repeat all procedure twice.
+     *
+     * @throws Exception
+     */
+    @Test(enabled = true, timeOut = 1200000)
+    public void rescheduleKilledProcess02() throws Exception {
+        bundles[0].setProcessValidity(TimeUtil.getTimeWrtSystemTime(-11),
+            TimeUtil.getTimeWrtSystemTime(6));
+
+        bundles[0].setInputFeedDataPath(
+            baseHDFSDir + "/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+        String prefix = InstanceUtil.getFeedPrefix(bundles[0].getInputFeedFromBundle());
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        HadoopUtil.lateDataReplenish(clusterFS, 40, 1, prefix, null);
+
+        logger.info("process: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL,
+            bundles[0].getProcessData()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
new file mode 100644
index 0000000..1e37011
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job.Status;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+
+@Test(groups = "embedded")
+public class RescheduleProcessInFinalStatesTest extends BaseTestClass {
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    String baseTestDir = baseHDFSDir + "/RescheduleProcessInFinalStates";
+    String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    String inputPath = baseTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private static final Logger logger = Logger.getLogger(RescheduleProcessInFinalStatesTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        logger.info("in @BeforeClass");
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+        Bundle b = BundleUtil.readELBundle();
+        b.generateUniqueBundle();
+        b = new Bundle(b, cluster);
+        b.setProcessWorkflow(aggregateWorkflowDir);
+
+        String startDate = "2010-01-01T20:00Z";
+        String endDate = "2010-01-03T01:04Z";
+
+        b.setInputFeedDataPath(inputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(inputPath);
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:15Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(
+            baseTestDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].submitFeedsScheduleProcess(prism);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Wait till process succeed and delete it. Check that entity is absent on server. Reschedule
+     * it and check that it succeeds after some time.
+     *
+     * @throws Exception
+     */
+    @Test(enabled = true)
+    public void rescheduleSucceeded() throws Exception {
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        checkNotFoundDefinition(bundles[0].getProcessData());
+
+        //submit and schedule process again
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+    }
+
+    /**
+     * Fully duplicates rescheduleSucceeded().
+     * TODO : modify test to match test case
+     * Make process run into FAILED state. Delete it and check that entity was removed.
+     * Run it again and check that process succeeds.
+     *
+     * @throws Exception
+     */
+    @Test(enabled = false)
+    public void rescheduleFailed() throws Exception {
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        checkNotFoundDefinition(bundles[0].getProcessData());
+
+        //submit and schedule process again
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+    }
+
+    /**
+     * Make process got DOWN WITH ERROR state. Delete it. Check that entity is absent on the
+     * server. Reschedule it and check that it succeeds in some time.
+     * DWE mean Done With Error In Oozie
+     *
+     * @throws Exception
+     */
+    @Test(enabled = true)
+    public void rescheduleDWE() throws Exception {
+        prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z");
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.DONEWITHERROR);
+
+        //delete the process
+        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        checkNotFoundDefinition(bundles[0].getProcessData());
+
+        //submit and schedule process again
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+    }
+
+    /**
+     * Make process run into DOWN WITH ERROR state. Delete it. Check that entity is absent on the
+     * server. Reschedule it and check that it succeeds in some time.
+     **/
+    @Test(enabled = true)
+    public void rescheduleKilled() throws Exception {
+        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.KILLED);
+        checkNotFoundDefinition(bundles[0].getProcessData());
+
+        //submit and schedule process again
+        AssertUtil.assertSucceeded(prism.getProcessHelper()
+            .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+    }
+
+    /**
+     * Tries to get entity definition and checks it is absent (-get definition should return
+     * process not found)
+     *
+     * @param process process entity definition
+     * @throws URISyntaxException
+     * @throws IOException
+     * @throws AuthenticationException
+     * @throws JAXBException
+     */
+    private void checkNotFoundDefinition(String process)
+        throws URISyntaxException, IOException, AuthenticationException, JAXBException {
+        ServiceResponse r = prism.getProcessHelper()
+            .getEntityDefinition(URLS.GET_ENTITY_DEFINITION, process);
+        Assert.assertTrue(r.getMessage().contains("(process) not found"));
+        AssertUtil.assertFailed(r);
+    }
+}