You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:06 UTC
[07/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
new file mode 100644
index 0000000..1fe768a
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
@@ -0,0 +1,1009 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismProcessDeleteTest extends BaseTestClass {
+
+ Bundle bundle;
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessDeleteTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismProcessDeleteTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ bundle = BundleUtil.readLateDataBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /* NOTE: All test cases assume that there are two entities scheduled in each colo
+ com.inmobi.qa.airavatqa.prism.PrismProcessDeleteTest
+ .testUA1ProcessDeleteAlreadyDeletedProcess */
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testUA1ProcessDeleteInBothColos() throws Exception {
+ //now submit the thing to prism
+ bundles[0].submitFeedsScheduleProcess();
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundle.getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA1:
+ compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+ }
+
+ @Test(groups = {"prism", "0.2"})
+ public void testUA1ProcessDeleteWhen1ColoIsDown() throws Exception {
+ try {
+ //now submit the thing to prism
+ bundles[0].submitFeedsScheduleProcess();
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+
+ //bring down UA2 colo :P
+ Util.shutDownService(cluster2.getClusterHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundle.getProcessData());
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA2ArchiveStore);
+
+ //bring service up
+ Util.startService(cluster2.getProcessHelper());
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
+
+ compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+ compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+ clusterName);
+
+ compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
+ compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
+
+ compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+ compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
+ clusterName);
+ } catch (Exception e) {
+ logger.info(e.getMessage());
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getClusterHelper());
+ }
+ }
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testUA1ProcessDeleteAlreadyDeletedProcess() throws Exception {
+ try {
+ //now submit the thing to prism
+ bundles[0].submitFeedsScheduleProcess();
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+ );
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+ );
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+ } catch (Exception e) {
+ logger.info(e.getMessage());
+ throw new TestNGException(e.getMessage());
+ }
+ }
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testUA1ProcessDeleteTwiceWhen1ColoIsDownDuring1stDelete()
+ throws Exception {
+ try {
+ bundles[0].submitFeedsScheduleProcess();
+
+ Util.shutDownService(cluster2.getClusterHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //start up service
+ Util.startService(cluster2.getClusterHelper());
+
+ //delete again
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //get final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundle.getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+
+ //UA1:
+ compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+ } catch (Exception e) {
+ logger.info(e.getMessage());
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getClusterHelper());
+ }
+ }
+
+ @Test(groups = {"prism", "0.2"})
+ public void testUA1ProcessDeleteNonExistent() throws Exception {
+ try {
+ //now lets get the final states
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //delete
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //get final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+ } catch (Exception e) {
+ logger.info(e.getMessage());
+ throw new TestNGException(e.getMessage());
+ }
+ }
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testUA1ProcessDeleteNonExistentWhen1ColoIsDownDuringDelete()
+ throws Exception {
+ try {
+ //now lets get the final states
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //bring down UA1
+ Util.shutDownService(cluster2.getClusterHelper());
+
+ //delete
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+ );
+
+ //get final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+ Util.startService(cluster2.getClusterHelper());
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+ );
+
+ } catch (Exception e) {
+ logger.info(e.getMessage());
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getClusterHelper());
+ }
+ }
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessScheduledInOneColo() throws Exception {
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundle.getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA1:
+ compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+ }
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessSuspendedInOneColo() throws Exception {
+ //create a UA1 bundle
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ //suspend UA1 colo thingy
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA1:
+ compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+ }
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessSuspendedInOneColoWhileBothProcessesAreSuspended()
+ throws Exception {
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ //suspend UA1 colo thingy
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundle.getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA1:
+ compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName);
+ compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+ }
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessSuspendedInOneColoWhileThatColoIsDown()
+ throws Exception {
+ try {
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+ );
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //shutdown UA1
+ Util.shutDownService(cluster2.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getFeedHelper());
+ }
+ }
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessScheduledInOneColoWhileThatColoIsDown()
+ throws Exception {
+ try {
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //shutdown UA1
+ Util.shutDownService(cluster2.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getProcessData());
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+ //UA2:
+ compareDataStoresForEquality(initialUA2Store, finalUA2Store);
+ compareDataStoresForEquality(finalUA2ArchiveStore, initialUA2ArchiveStore);
+
+ Util.startService(cluster2.getClusterHelper());
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+ );
+
+ HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
+
+ compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
+ compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
+
+ compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+ compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+ clusterName);
+
+ compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+ compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
+ clusterName);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getFeedHelper());
+ }
+ }
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessSuspendedInOneColoWhileAnotherColoIsDown()
+ throws Exception {
+ try {
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ //now submit the thing to prism
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+ );
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //shutdown UA1
+ Util.shutDownService(cluster2.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+ );
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundle.getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+ //UA2:
+ compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
+ compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getFeedHelper());
+ }
+ }
+
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessSuspendedInOneColoWhileAnotherColoIsDownWithFeedSuspended()
+ throws Exception {
+ try {
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ //now submit the thing to prism
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+ );
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //shutdown UA1
+ Util.shutDownService(cluster2.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+ );
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundle.getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+ //UA2:
+ compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
+ compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getFeedHelper());
+ }
+ }
+
+ @Test(groups = {"prism", "0.2"})
+ public void testDeleteProcessScheduledInOneColoWhileAnotherColoIsDown()
+ throws Exception {
+ try {
+ bundles[0].submitFeedsScheduleProcess();
+ bundles[1].submitFeedsScheduleProcess();
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> initialUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> initialUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> initialUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //shutdown UA1
+ Util.shutDownService(cluster2.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+ );
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getProcessHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getProcessHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalUA1Store = cluster2.getProcessHelper().getStoreInfo();
+ List<String> finalUA1ArchiveStore = cluster2.getProcessHelper().getArchiveInfo();
+
+ List<String> finalUA2Store = cluster1.getProcessHelper().getStoreInfo();
+ List<String> finalUA2ArchiveStore = cluster1.getProcessHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[1].getProcessData());
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //UA1:
+ compareDataStoresForEquality(initialUA1Store, finalUA1Store);
+ compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore);
+
+ //UA2:
+ compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName);
+ compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName);
+
+
+ Util.startService(cluster2.getClusterHelper());
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+ );
+
+ HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS);
+
+ clusterName = Util.readEntityName(bundles[0].getProcessData());
+
+ compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store"));
+ compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive"));
+
+ compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+ compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+ clusterName);
+
+ compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName);
+ compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore,
+ clusterName);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getFeedHelper());
+ }
+ }
+
+
+ private void compareDataStoreStates(List<String> initialState, List<String> finalState,
+ String filename) {
+
+ List<String> temp = new ArrayList<String>(initialState);
+ temp.removeAll(finalState);
+ Assert.assertEquals(temp.size(), 1);
+ Assert.assertTrue(temp.get(0).contains(filename));
+
+ }
+
+
+ private void compareDataStoresForEquality(List<String> store1, List<String> store2) {
+ Assert.assertTrue(Arrays.deepEquals(store2.toArray(new String[store2.size()]),
+ store1.toArray(new String[store1.size()])));
+ }
+
+ public HashMap<String, List<String>> getSystemState(EntityType entityType) throws Exception {
+ IEntityManagerHelper prizm = prism.getClusterHelper();
+ IEntityManagerHelper ua1 = cluster2.getClusterHelper();
+ IEntityManagerHelper ua2 = cluster1.getClusterHelper();
+
+ if (entityType == EntityType.FEED) {
+ prizm = prism.getFeedHelper();
+ ua1 = cluster2.getFeedHelper();
+ ua2 = cluster1.getFeedHelper();
+ }
+
+ if (entityType == EntityType.PROCESS) {
+ prizm = prism.getProcessHelper();
+ ua1 = cluster2.getProcessHelper();
+ ua2 = cluster1.getProcessHelper();
+ }
+
+ HashMap<String, List<String>> temp = new HashMap<String, List<String>>();
+ temp.put("prismArchive", prizm.getArchiveInfo());
+ temp.put("prismStore", prizm.getStoreInfo());
+ temp.put("ua1Archive", ua1.getArchiveInfo());
+ temp.put("ua1Store", ua1.getStoreInfo());
+ temp.put("ua2Archive", ua2.getArchiveInfo());
+ temp.put("ua2Store", ua2.getStoreInfo());
+
+ return temp;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java
new file mode 100644
index 0000000..2e0fbaf
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+@Test(groups = "distributed")
+public class PrismProcessResumeTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster1OC = serverOC.get(0);
+ OozieClient cluster2OC = serverOC.get(1);
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessResumeTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismProcessResumeTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ Bundle bundle = BundleUtil.readLateDataBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Schedule process. Suspend/resume it one by one. Check that process really suspended/resumed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeSuspendedFeedOnBothColos() throws Exception {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+ //suspend using prism
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //resume using prism
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //suspend using the colohelper
+ AssertUtil.assertSucceeded(
+ cluster2.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+ );
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //resume using colohelper
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ cluster1.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+ );
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+ //resume using colohelper
+ AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ /**
+ * Schedule processes, remove them. Try to resume them using colo-helpers and through prism.
+ * Attempt to -resume process which was removed should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeDeletedProcessOnBothColos() throws Exception {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+ //delete using prism
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ //try to resume it through prism
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //delete using prism
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData()));
+
+ //try to resume it through prism
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+
+ //try to resume process through colohelper
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ //try to resume process through colohelper
+ AssertUtil.assertFailed(cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+ }
+
+ /**
+ * Schedule processes. One by one suspend them and then resume. Then try to resume them once
+ * more.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeResumedProcessOnBothColos() throws Exception {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+
+ for (int i = 0; i < 2; i++) {
+ //resume suspended process using prism
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+ for (int i = 0; i < 2; i++) {
+ //resume resumed process
+ AssertUtil.assertSucceeded(
+ cluster2.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData())
+ );
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+ }
+
+ for (int i = 0; i < 2; i++) {
+ //resume on the other one
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+ );
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ for (int i = 0; i < 2; i++) {
+ //resume another resumed process
+ AssertUtil.assertSucceeded(
+ cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+ );
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+ }
+
+ /**
+ * Attempt to resume non-existent process should fail through both prism and colohelpers.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResumeNonExistentProcessOnBothColos() throws Exception {
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ }
+
+ /**
+ * Attempt to resume process which wasn't submitted should fail.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResumeSubmittedProcessOnBothColos() throws Exception {
+ bundles[0].submitProcess(true);
+ bundles[1].submitProcess(true);
+
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+
+ }
+
+ /**
+ * Schedule processes on both servers and then suspend them. Shutdown server. Check that it's
+ * impossible to resume process on this server and possible on another server.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeScheduledProcessOnBothColosWhen1ColoIsDown()
+ throws Exception {
+ try {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+ AssertUtil.assertSucceeded(
+ cluster2.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+ );
+ AssertUtil.assertSucceeded(
+ cluster1.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData())
+ );
+
+ Util.shutDownService(cluster2.getProcessHelper());
+
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+ //resume on the other one
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil
+ .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster2.getFeedHelper());
+ }
+
+ }
+
+ /**
+ * Schedule processes on both servers. Remove process form one of them. Shutdown server.
+ * Check that it's impossible to resume process on that server. Then remove another process
+ * from another server. Check the same.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeDeletedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+ try {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+ //delete using coloHelpers
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getProcessData())
+ );
+
+ Util.shutDownService(cluster2.getProcessHelper());
+
+ //try to resume using prism
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //try to resume using colohelper
+ AssertUtil.assertFailed(
+ cluster2.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData())
+ );
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[1].getProcessData())
+ );
+ //suspend on the other one
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+
+ AssertUtil.assertFailed(
+ cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+ );
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster2.getFeedHelper());
+ }
+ }
+
+ /**
+ * Schedule processes on both servers. Suspend process on one server. Resume it. Shutdown
+ * this server. Try to resume that process once more. Attempt should fail. Then suspend
+ * process on another server. Resume it. Try to resume it once more. Should succeed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeResumedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+ try {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster2);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster1);
+
+ //suspend using prism
+ AssertUtil.assertSucceeded(
+ cluster2.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData())
+ );
+ //verify
+ AssertUtil
+ .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(
+ cluster2.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ Util.shutDownService(cluster2.getProcessHelper());
+
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+
+ for (int i = 0; i < 2; i++) {
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil
+ .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ for (int i = 0; i < 2; i++) {
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil
+ .checkStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster2.getProcessHelper());
+ }
+ }
+
+ /**
+ * Shutdown one of the server. Attempt to resume non-existent process on both servers should
+ * fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeNonExistentProcessOnBothColosWhen1ColoIsDown()
+ throws Exception {
+ try {
+ Util.shutDownService(cluster2.getProcessHelper());
+
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(
+ cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster2.getProcessHelper());
+ }
+ }
+
+ /**
+ * Submit processes on both servers. Shutdown one server. Attempt to resume non-scheduled
+ * process ob both servers should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2"})
+ public void testResumeSubmittedProcessOnBothColosWhen1ColoIsDown()
+ throws Exception {
+ try {
+ bundles[0].submitProcess(true);
+ bundles[1].submitProcess(true);
+
+ Util.shutDownService(cluster2.getProcessHelper());
+
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.assertFailed(
+ cluster1.getProcessHelper()
+ .resume(Util.URLS.RESUME_URL, bundles[1].getProcessData())
+ );
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster2.getProcessHelper());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
new file mode 100644
index 0000000..319363c
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.supportClasses.HadoopFileEditor;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+
+public class PrismProcessScheduleTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster1OC = serverOC.get(0);
+ OozieClient cluster2OC = serverOC.get(1);
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessScheduleTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismProcessScheduleTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ Bundle bundle = BundleUtil.readLateDataBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Schedules first process on colo-1. Schedule second process on colo-2. Check that first
+ * process hasn't been scheduled on colo-2.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testProcessScheduleOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleProcess();
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ bundles[1].submitAndScheduleProcess();
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ }
+
+ /**
+ * Schedule first process on colo-1 and second one on colo-2. Then try to schedule them once
+ * more on the same colos. Check that request succeed and process status hasn't been changed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testScheduleAlreadyScheduledProcessOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleProcess();
+ bundles[1].submitAndScheduleProcess();
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ }
+
+ /**
+ * Schedule two processes on two different colos. Suspend process on first colo.
+ * Try to schedule first process once more. Check that its status didn't change. Resume that
+ * process. Suspend process on colo-2. Check that process on colo-1 is running and process on
+ * colo-2 is suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testScheduleSuspendedProcessOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleProcess();
+ bundles[1].submitAndScheduleProcess();
+
+ //suspend process on colo-1
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //now check if it has been scheduled correctly or not
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .resume(URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ //suspend process on colo-2
+ AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ //now check if it has been scheduled correctly or not
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ /**
+ * Schedule two processes on different colos. Delete both of them. Try to schedule them once
+ * more. Attempt should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testScheduleDeletedProcessOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleProcess();
+ bundles[1].submitAndScheduleProcess();
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(cluster1.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+ }
+
+ /**
+ * Attempt to schedule non-submitted process should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testScheduleNonExistentProcessOnBothColos() throws Exception {
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(cluster1.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+ }
+
+ /**
+ * Submit process which has colo-2 in it definition through prism. Shutdown falcon on colo-2.
+ * Submit and schedule the same process through prism. Check that mentioned process is running
+ * on colo-2.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testProcessScheduleOn1ColoWhileOtherColoIsDown() throws Exception {
+ try {
+ bundles[1].submitProcess(true);
+
+ Util.shutDownService(cluster2.getProcessHelper());
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //check if there is no criss cross
+ AssertUtil
+ .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getProcessHelper());
+ }
+ }
+
+ /**
+ * Submit process through prism. Shutdown a colo. Try to schedule process though prism.
+ * Process shouldn't be scheduled on that colo.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testProcessScheduleOn1ColoWhileThatColoIsDown() throws Exception {
+ try {
+ bundles[0].submitProcess(true);
+
+ Util.shutDownService(cluster2.getProcessHelper());
+
+ AssertUtil.assertFailed(prism.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil
+ .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ } finally {
+ Util.restartService(cluster2.getProcessHelper());
+ }
+
+ }
+
+ /**
+ * Submit and schedule process. Suspend it. Submit and schedule another process on another
+ * colo. Check that first process is suspended and the second is running both on matching
+ * colos.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testProcessScheduleOn1ColoWhileAnotherColoHasSuspendedProcess()
+ throws Exception {
+ try {
+ bundles[0].submitAndScheduleProcess();
+ AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+
+ bundles[1].submitAndScheduleProcess();
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil
+ .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil
+ .checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ }
+
+ }
+
+ /**
+ * Schedule process on one colo. Kill it. Schedule process on another colo. Check that
+ * processes were scheduled on appropriate colos and have expected statuses killed
+ * and running respectively.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testProcessScheduleOn1ColoWhileAnotherColoHasKilledProcess()
+ throws Exception {
+ try {
+ bundles[0].submitAndScheduleProcess();
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+
+ bundles[1].submitAndScheduleProcess();
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil
+ .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil
+ .checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ }
+ }
+
+ /**
+ * Schedule process. Wait till it become killed. Remove it. Submit and schedule it again.
+ * Check that process was scheduled with new bundle associated to it.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000)
+ public void testRescheduleKilledProcess() throws Exception {
+
+ /*
+ add test data generator pending
+ */
+
+ bundles[0].setProcessValidity(TimeUtil.getTimeWrtSystemTime(-1),
+ TimeUtil.getTimeWrtSystemTime(1));
+ HadoopFileEditor hadoopFileEditor = null;
+ try {
+
+ hadoopFileEditor = new HadoopFileEditor(cluster1
+ .getClusterHelper().getHadoopFS());
+
+ hadoopFileEditor.edit(new ProcessMerlin(bundles[0]
+ .getProcessData()).getWorkflow().getPath() + "/workflow.xml",
+ "<value>${outputData}</value>",
+ "<property>\n" +
+ " <name>randomProp</name>\n" +
+ " <value>randomValue</value>\n" +
+ " </property>");
+
+ bundles[0].submitFeedsScheduleProcess(prism);
+
+ InstanceUtil.waitForBundleToReachState(cluster1,
+ Util.readEntityName(bundles[0].getProcessData()),
+ org.apache.oozie.client.Job.Status.KILLED);
+
+ String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
+ Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
+
+ prism.getProcessHelper().delete(URLS.DELETE_URL,
+ bundles[0].getProcessData());
+
+ bundles[0].submitAndScheduleProcess();
+
+ OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID,
+ new ArrayList<String>(),
+ bundles[0].getProcessData(), true,
+ false);
+ } finally {
+
+ if (hadoopFileEditor != null) {
+ hadoopFileEditor.restore();
+ }
+ }
+ }
+}