You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:09 UTC
[10/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
new file mode 100644
index 0000000..f4dfe78
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
@@ -0,0 +1,1128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismFeedDeleteTest extends BaseTestClass {
+
+ private boolean restartRequired;
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ private String cluster1Colo = cluster1.getClusterHelper().getColoName();
+ private String cluster2Colo = cluster2.getClusterHelper().getColoName();
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedDeleteTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismFeedDeleteTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ restartRequired = false;
+ Bundle bundle = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundle, cluster1);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+
+ bundles[1] = new Bundle(bundle, cluster2);
+ bundles[1].generateUniqueBundle();
+ bundles[1].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (restartRequired) {
+ Util.restartService(cluster1.getFeedHelper());
+ }
+ removeBundles();
+ }
+
+ /**
+ * NOTE: All test cases assume that there are two entities scheduled in each colo
+ */
+
+ @Test(groups = {"multiCluster"})
+ public void testServer1FeedDeleteInBothColos() throws Exception {
+ bundles[0].submitFeed();
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String feedName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, feedName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, feedName);
+
+ //server1:
+ compareDataStoreStates(initialServer1Store, finalServer1Store, feedName);
+ compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, feedName);
+
+ //server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void testServer1FeedDeleteWhen1ColoIsDown() throws Exception {
+ restartRequired = true;
+
+ bundles[0].submitFeed();
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+
+ //bring down Server2 colo :P
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+ Util.startService(cluster1.getFeedHelper());
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ List<String> Server2ArchivePostUp = cluster2.getFeedHelper().getArchiveInfo();
+ List<String> Server2StorePostUp = cluster2.getFeedHelper().getStoreInfo();
+
+ List<String> Server1ArchivePostUp = cluster1.getFeedHelper().getArchiveInfo();
+ List<String> Server1StorePostUp = cluster1.getFeedHelper().getStoreInfo();
+
+ List<String> prismHelperArchivePostUp = prism.getFeedHelper().getArchiveInfo();
+ List<String> prismHelperStorePostUp = prism.getFeedHelper().getStoreInfo();
+
+ compareDataStoreStates(finalPrismStore, prismHelperStorePostUp, clusterName);
+ compareDataStoreStates(prismHelperArchivePostUp, finalPrismArchiveStore, clusterName);
+
+ compareDataStoreStates(initialServer1Store, Server1StorePostUp, clusterName);
+ compareDataStoreStates(Server1ArchivePostUp, finalServer1ArchiveStore, clusterName);
+
+ compareDataStoresForEquality(finalServer2Store, Server2StorePostUp);
+ compareDataStoresForEquality(finalServer2ArchiveStore, Server2ArchivePostUp);
+ }
+
+
+ @Test(groups = {"multiCluster"})
+ public void testServer1FeedDeleteAlreadyDeletedFeed() throws Exception {
+ restartRequired = true;
+ bundles[0].submitFeed();
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(initialServer2ArchiveStore, finalServer2ArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+ }
+
+
+ @Test(groups = {"multiCluster"})
+ public void testServer1FeedDeleteTwiceWhen1ColoIsDownDuring1stDelete() throws Exception {
+ restartRequired = true;
+
+ bundles[0].submitFeed();
+
+ Util.shutDownService(cluster1.getClusterHelper());
+
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //start up service
+ Util.startService(cluster1.getFeedHelper());
+
+ //delete again
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //get final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(initialServer2ArchiveStore, finalServer2ArchiveStore);
+
+ //Server1:
+ compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+ compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void testServer1FeedDeleteNonExistent() throws Exception {
+ //now lets get the final states
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //get final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(initialPrismArchiveStore, finalPrismArchiveStore);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(initialServer2ArchiveStore, finalServer2ArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+ }
+
+
+ @Test(groups = {"multiCluster"})
+ public void testServer1FeedDeleteNonExistentWhen1ColoIsDownDuringDelete() throws Exception {
+ restartRequired = true;
+ bundles[0] = new Bundle(bundles[0], cluster1);
+ bundles[1] = new Bundle(bundles[1], cluster2);
+
+ bundles[0].setCLusterColo(cluster1Colo);
+ logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+ ServiceResponse r = prism.getClusterHelper()
+ .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ bundles[1].setCLusterColo(cluster2Colo);
+ logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ String startTimeServer1 = "2012-10-01T12:00Z";
+ String startTimeServer2 = "2012-10-01T12:00Z";
+
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer1, "2099-10-01T12:10Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+ "${cluster.colo}",
+ baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer2, "2099-10-01T12:25Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ baseHDFSDir +
+ "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feed));
+ }
+
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedScheduledInOneColo() throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //Server1:
+ compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+ compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedSuspendedInOneColo() throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ //suspend Server1 colo thingy
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //Server1:
+ compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+ compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+
+ }
+
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedSuspendedInOneColoWhileBothFeedsAreSuspended() throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ //suspend Server1 colo thingy
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //Server1:
+ compareDataStoreStates(initialServer1Store, finalServer1Store, clusterName);
+ compareDataStoreStates(finalServer1ArchiveStore, initialServer1ArchiveStore, clusterName);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedSuspendedInOneColoWhileThatColoIsDown()
+ throws Exception {
+ restartRequired = true;
+
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //shutdown Server1
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+ Util.startService(cluster1.getClusterHelper());
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ List<String> Server1StorePostUp = cluster1.getFeedHelper().getStoreInfo();
+ List<String> Server1ArchivePostUp = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> Server2StorePostUp = cluster2.getFeedHelper().getStoreInfo();
+ List<String> Server2ArchivePostUp = cluster2.getFeedHelper().getArchiveInfo();
+
+ List<String> prismStorePostUp = prism.getFeedHelper().getStoreInfo();
+ List<String> prismArchivePostUp = prism.getFeedHelper().getArchiveInfo();
+
+
+ compareDataStoresForEquality(Server2StorePostUp, finalServer2Store);
+ compareDataStoresForEquality(Server2ArchivePostUp, finalServer2ArchiveStore);
+
+ compareDataStoreStates(finalServer1Store, Server1StorePostUp, clusterName);
+ compareDataStoreStates(Server1ArchivePostUp, finalServer1ArchiveStore, clusterName);
+
+ compareDataStoreStates(finalPrismStore, prismStorePostUp, clusterName);
+ compareDataStoreStates(prismArchivePostUp, finalPrismArchiveStore, clusterName);
+ }
+
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedSuspendedInOneColoWhileThatColoIsDownAndOtherHasSuspendedFeed()
+ throws Exception {
+ restartRequired = true;
+
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //shutdown Server1
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ //lets now delete the feed from both colos
+ AssertUtil.assertFailed(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+ Util.startService(cluster1.getFeedHelper());
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ HashMap<String, List<String>> finalSystemState = getSystemState(EntityType.FEED);
+
+ compareDataStoreStates(finalSystemState.get("prismArchive"), finalPrismArchiveStore,
+ clusterName);
+ compareDataStoreStates(finalPrismStore, finalSystemState.get("prismStore"), clusterName);
+
+ compareDataStoreStates(finalServer1Store, finalSystemState.get("Server1Store"),
+ clusterName);
+ compareDataStoreStates(finalSystemState.get("Server1Archive"), finalServer1ArchiveStore,
+ clusterName);
+
+ compareDataStoresForEquality(finalSystemState.get("Server2Archive"),
+ finalServer2ArchiveStore);
+ compareDataStoresForEquality(finalSystemState.get("Server2Store"), finalServer2Store);
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedScheduledInOneColoWhileThatColoIsDown() throws Exception {
+ restartRequired = true;
+
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //shutdown Server1
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertFailed(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+ //Server2:
+ compareDataStoresForEquality(initialServer2Store, finalServer2Store);
+ compareDataStoresForEquality(finalServer2ArchiveStore, initialServer2ArchiveStore);
+
+
+ Util.startService(cluster1.getClusterHelper());
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ HashMap<String, List<String>> systemStatePostUp = getSystemState(EntityType.FEED);
+
+ compareDataStoreStates(finalPrismStore, systemStatePostUp.get("prismStore"), clusterName);
+ compareDataStoreStates(systemStatePostUp.get("prismArchive"), finalPrismArchiveStore,
+ clusterName);
+
+ compareDataStoreStates(finalServer1Store, systemStatePostUp.get("Server1Store"),
+ clusterName);
+ compareDataStoreStates(systemStatePostUp.get("Server1Archive"), finalServer1ArchiveStore,
+ clusterName);
+
+ compareDataStoresForEquality(finalServer2ArchiveStore,
+ systemStatePostUp.get("Server2Archive"));
+ compareDataStoresForEquality(finalServer2Store, systemStatePostUp.get("Server2Store"));
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedSuspendedInOneColoWhileAnotherColoIsDown() throws Exception {
+ restartRequired = true;
+
+ bundles[0].setCLusterColo(cluster1Colo);
+ logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ bundles[1].setCLusterColo(cluster2Colo);
+ logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ String startTimeServer1 = "2012-10-01T12:00Z";
+ String startTimeServer2 = "2012-10-01T12:00Z";
+
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer1, "2099-10-01T12:10Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+ "${cluster.colo}",
+ baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer2, "2099-10-01T12:25Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ baseHDFSDir +
+ "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+ logger.info("feed: " + Util.prettyPrintXml(feed));
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+
+ AssertUtil.assertSucceeded(r);
+
+ r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+ AssertUtil.assertSucceeded(r);
+ TimeUtil.sleepSeconds(15);
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ r = prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed);
+ TimeUtil.sleepSeconds(10);
+ AssertUtil.assertPartial(r);
+ Assert
+ .assertTrue(r.getMessage().contains(cluster1Colo + "/org.apache.falcon.FalconException")
+ && r.getMessage().contains(cluster2Colo + "/" + Util.readEntityName(feed)));
+
+ ServiceResponse response = prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feed);
+ Assert.assertTrue(
+ response.getMessage().contains(cluster1Colo + "/org.apache.falcon.FalconException")
+ && response.getMessage().contains(cluster2Colo + "/" + Util.readEntityName(feed)));
+ AssertUtil.assertPartial(response);
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(finalServer1ArchiveStore, initialServer1ArchiveStore);
+
+ //Server2:
+ compareDataStoreStates(initialServer2Store, finalServer2Store, clusterName);
+ compareDataStoreStates(finalServer2ArchiveStore, initialServer2ArchiveStore, clusterName);
+ }
+
+ @Test(enabled = true)
+ public void testDeleteFeedSuspendedInOneColoWhileAnotherColoIsDownWithFeedSuspended()
+ throws Exception {
+ restartRequired = true;
+
+ bundles[0].setCLusterColo(cluster1Colo);
+ logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ bundles[1].setCLusterColo(cluster2Colo);
+ logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ String startTimeServer1 = "2012-10-01T12:00Z";
+ String startTimeServer2 = "2012-10-01T12:00Z";
+
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer1, "2099-10-01T12:10Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+ "${cluster.colo}",
+ baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeServer2, "2099-10-01T12:25Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ baseHDFSDir + "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/$" +
+ "{MINUTE}");
+
+ logger.info("feed: " + Util.prettyPrintXml(feed));
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+
+ AssertUtil.assertSucceeded(r);
+
+ r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+ AssertUtil.assertSucceeded(r);
+ TimeUtil.sleepSeconds(15);
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ r = prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed);
+ TimeUtil.sleepSeconds(10);
+ AssertUtil.assertSucceeded(r);
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ ServiceResponse response = prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feed);
+ Assert.assertTrue(response.getMessage().contains(cluster1Colo + "/org.apache.falcon" +
+ ".FalconException")
+ && response.getMessage().contains(cluster2Colo + "/" + Util.readEntityName(feed)));
+ AssertUtil.assertPartial(response);
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+ //prism:
+ compareDataStoresForEquality(initialPrismStore, finalPrismStore);
+ compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(finalServer1ArchiveStore, initialServer1ArchiveStore);
+
+ //Server2:
+ compareDataStoreStates(initialServer2Store, finalServer2Store, clusterName);
+ compareDataStoreStates(finalServer2ArchiveStore, initialServer2ArchiveStore, clusterName);
+ }
+
+
+ @Test(groups = {"multiCluster"})
+ public void testDeleteFeedScheduledInOneColoWhileAnotherColoIsDown() throws Exception {
+ restartRequired = true;
+
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ //fetch the initial store and archive state for prism
+ List<String> initialPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> initialPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the initial store and archive for both colos
+ List<String> initialServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> initialServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> initialServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> initialServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //shutdown Server1
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ //lets now delete the cluster from both colos
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+
+ //now lets get the final states
+ List<String> finalPrismStore = prism.getFeedHelper().getStoreInfo();
+ List<String> finalPrismArchiveStore = prism.getFeedHelper().getArchiveInfo();
+
+ //fetch the final store and archive for both colos
+ List<String> finalServer1Store = cluster1.getFeedHelper().getStoreInfo();
+ List<String> finalServer1ArchiveStore = cluster1.getFeedHelper().getArchiveInfo();
+
+ List<String> finalServer2Store = cluster2.getFeedHelper().getStoreInfo();
+ List<String> finalServer2ArchiveStore = cluster2.getFeedHelper().getArchiveInfo();
+
+ //now ensure that data has been deleted from all cluster store and is present in the
+ // cluster archives
+
+ String clusterName = Util.readEntityName(bundles[1].getDataSets().get(0));
+ //prism:
+ compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName);
+ compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName);
+
+ //Server1:
+ compareDataStoresForEquality(initialServer1Store, finalServer1Store);
+ compareDataStoresForEquality(initialServer1ArchiveStore, finalServer1ArchiveStore);
+
+ //Server2:
+ compareDataStoreStates(initialServer2Store, finalServer2Store, clusterName);
+ compareDataStoreStates(finalServer2ArchiveStore, initialServer2ArchiveStore, clusterName);
+
+ Util.startService(cluster1.getFeedHelper());
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+
+ clusterName = Util.readEntityName(bundles[0].getDataSets().get(0));
+
+ HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.FEED);
+
+ compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore,
+ clusterName);
+ compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName);
+
+ compareDataStoreStates(systemPostUp.get("Server1Archive"), finalServer1ArchiveStore,
+ clusterName);
+ compareDataStoreStates(finalServer1Store, systemPostUp.get("Server1Store"), clusterName);
+
+ compareDataStoresForEquality(finalServer2ArchiveStore, systemPostUp.get("Server2Archive"));
+ compareDataStoresForEquality(finalServer2Store, systemPostUp.get("Server2Store"));
+ }
+
+ private void compareDataStoreStates(List<String> initialState, List<String> finalState,
+ String filename) {
+ List<String> temp = new ArrayList<String>();
+ temp.addAll(initialState);
+ temp.removeAll(finalState);
+ Assert.assertEquals(temp.size(), 1);
+ Assert.assertTrue(temp.get(0).contains(filename));
+
+ }
+
+ private void compareDataStoresForEquality(List<String> store1, List<String> store2) {
+ Assert.assertEquals(store1.size(), store2.size(), "DataStores are not equal!");
+ Assert.assertTrue(Arrays.deepEquals(store2.toArray(new String[store2.size()]),
+ store1.toArray(new String[store1.size()])), "DataStores are not equal!");
+ }
+
+ public HashMap<String, List<String>> getSystemState(EntityType entityType) throws Exception {
+ IEntityManagerHelper prismHelper = prism.getClusterHelper();
+ IEntityManagerHelper server1Helper = cluster1.getClusterHelper();
+ IEntityManagerHelper server2Helper = cluster2.getClusterHelper();
+
+ if (entityType == EntityType.FEED) {
+ prismHelper = prism.getFeedHelper();
+ server1Helper = cluster1.getFeedHelper();
+ server2Helper = cluster2.getFeedHelper();
+ }
+
+ if (entityType == EntityType.PROCESS) {
+ prismHelper = prism.getProcessHelper();
+ server1Helper = cluster1.getProcessHelper();
+ server2Helper = cluster2.getProcessHelper();
+ }
+
+ HashMap<String, List<String>> temp = new HashMap<String, List<String>>();
+ temp.put("prismArchive", prismHelper.getArchiveInfo());
+ temp.put("prismStore", prismHelper.getStoreInfo());
+ temp.put("Server1Archive", server1Helper.getArchiveInfo());
+ temp.put("Server1Store", server1Helper.getStoreInfo());
+ temp.put("Server2Archive", server2Helper.getArchiveInfo());
+ temp.put("Server2Store", server2Helper.getStoreInfo());
+
+ return temp;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
new file mode 100644
index 0000000..77d07af
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismFeedLateReplicationTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ ColoHelper cluster3 = servers.get(2);
+ FileSystem cluster1FS = serverFS.get(0);
+ FileSystem cluster2FS = serverFS.get(1);
+ FileSystem cluster3FS = serverFS.get(2);
+ private String baseTestDir = baseHDFSDir + "/PrismFeedLateReplicationTest";
+ private String inputPath =
+ baseTestDir + "/input-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/";
+ private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismFeedLateReplicationTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ Bundle bundle = BundleUtil.readELBundle();
+ for (int i = 0; i < 3; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void multipleSourceOneTarget_pastData() throws Exception {
+
+ bundles[0].setInputFeedDataPath(inputPath);
+ Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+ String postFix = "/US/" + cluster2.getClusterHelper().getColoName();
+ String prefix = bundles[0].getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+ HadoopUtil.lateDataReplenish(cluster2FS, 90, 1, prefix, postFix);
+
+
+ postFix = "/UK/" + cluster3.getClusterHelper().getColoName();
+ prefix = bundles[0].getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(-30);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+ "US/${cluster.colo}");
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+ null);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+ "UK/${cluster.colo}");
+
+
+ logger.info("feed: " + Util.prettyPrintXml(feed));
+
+ prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+ TimeUtil.sleepSeconds(10);
+
+ String bundleId =
+ InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+ //wait till 1st instance of replication coord is SUCCEEDED
+ List<String> replicationCoordIDTarget = InstanceUtil
+ .getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+
+ for (int i = 0; i < 30; i++) {
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ 0)
+ == WorkflowJob.Status.SUCCEEDED
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0)
+ == WorkflowJob.Status.SUCCEEDED) {
+ break;
+ }
+ TimeUtil.sleepSeconds(20);
+ }
+
+ TimeUtil.sleepSeconds(15);
+
+ List<String> inputFolderListForColo1 =
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ replicationCoordIDTarget.get(0), 1);
+ List<String> inputFolderListForColo2 =
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ replicationCoordIDTarget.get(1), 1);
+
+ HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo1);
+ HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo2);
+ }
+
+ @Test(groups = {"multiCluster"})
+ public void multipleSourceOneTarget_futureData() throws Exception {
+
+ bundles[0].setInputFeedDataPath(inputPath);
+ Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+ "US/${cluster.colo}");
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+ null);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+ "UK/${cluster.colo}");
+
+
+ logger.info("feed: " + Util.prettyPrintXml(feed));
+
+ prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+ TimeUtil.sleepSeconds(10);
+
+ String postFix = "/US/" + cluster2.getClusterHelper().getColoName();
+ String prefix = bundles[0].getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+ HadoopUtil.lateDataReplenish(cluster2FS, 90, 1, prefix, postFix);
+
+ postFix = "/UK/" + cluster3.getClusterHelper().getColoName();
+ prefix = bundles[0].getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ TimeUtil.sleepSeconds(60);
+
+ //wait till 1st instance of replication coord is SUCCEEDED
+ String bundleId = InstanceUtil
+ .getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+ List<String> replicationCoordIDTarget = InstanceUtil.getReplicationCoordID(bundleId,
+ cluster1.getFeedHelper());
+
+ for (int i = 0; i < 30; i++) {
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ 0)
+ == WorkflowJob.Status.SUCCEEDED
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0)
+ == WorkflowJob.Status.SUCCEEDED) {
+ break;
+ }
+ logger.info("still in for loop");
+ TimeUtil.sleepSeconds(20);
+ }
+
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(0), 0),
+ WorkflowJob.Status.SUCCEEDED);
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0),
+ WorkflowJob.Status.SUCCEEDED);
+
+ TimeUtil.sleepSeconds(15);
+
+ List<String> inputFolderListForColo1 = InstanceUtil
+ .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+ 1);
+ List<String> inputFolderListForColo2 = InstanceUtil
+ .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1),
+ 1);
+
+ HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo1);
+ HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo2);
+
+ //sleep till late starts
+ TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 4));
+
+ //check for run id to be 1
+ Assert.assertEquals(
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+ 1, "id has to be equal 1");
+ Assert.assertEquals(
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+ 1, "id has to be equal 1");
+
+ //wait for lates run to complete
+ for (int i = 0; i < 30; i++) {
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ 0)
+ == WorkflowJob.Status.SUCCEEDED
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0)
+ == WorkflowJob.Status.SUCCEEDED) {
+ break;
+ }
+ logger.info("still in for loop");
+ TimeUtil.sleepSeconds(20);
+ }
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(0), 0),
+ WorkflowJob.Status.SUCCEEDED);
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0),
+ WorkflowJob.Status.SUCCEEDED);
+
+ TimeUtil.sleepSeconds(30);
+
+ //put data for the second time
+ HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+ + "2ndLateData", inputFolderListForColo1);
+ HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+ + "2ndLateData", inputFolderListForColo2);
+
+ //sleep till late 2 starts
+ TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9));
+
+ //check for run id to be 2
+ Assert.assertEquals(
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0),
+ 2, "id has to be equal 2");
+ Assert.assertEquals(
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0),
+ 2, "id has to be equal 2");
+ }
+
+ /**
+ * this test case does the following
+ * two source ua2 and ua3
+ * ua3 has following part data
+ * ua1/ua2
+ * ua1/ua2
+ * ua1/ua2
+ * <p/>
+ * ua2 has following part data
+ * ua1/ua3
+ * ua1/ua3
+ * ua1/ua3
+ * <p/>
+ * ua1 is the target, which in the end should have all ua1 data
+ * <p/>
+ * after first instance succeed data in put into relevant source and late should rerun
+ * <p/>
+ * after first late succeed data is put into other source and late should not
+ */
+
+ @Test(groups = {"multiCluster"})
+ public void mixedTest01() throws Exception {
+
+ bundles[0].setInputFeedDataPath(inputPath);
+ Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+ "ua1/${cluster.colo}");
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+ null);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+ "ua1/${cluster.colo}");
+
+ //create data in colos
+
+ String postFix = "/ua1/ua2";
+ String prefix = bundles[0].getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+ HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua2/ua2";
+ HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua3/ua2";
+ HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+ //put _SUCCESS in parent folder UA2
+ HadoopUtil.putFileInFolderHDFS(cluster2FS, 90, 1, prefix, "_SUCCESS");
+
+ postFix = "/ua1/ua3";
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua2/ua3";
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua3/ua3";
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ //put _SUCCESS in parent folder of UA3
+ HadoopUtil.putFileInFolderHDFS(cluster3FS, 90, 1, prefix, "_SUCCESS");
+
+ //submit and schedule feed
+ logger.info("feed: " + Util.prettyPrintXml(feed));
+
+ prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+ TimeUtil.sleepSeconds(10);
+
+ //wait till 1st instance of replication coord is SUCCEEDED
+ String bundleId =
+ InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+ List<String> replicationCoordIDTarget =
+ InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+
+ for (int i = 0; i < 30; i++) {
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ 0)
+ == WorkflowJob.Status.SUCCEEDED
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0)
+ == WorkflowJob.Status.SUCCEEDED) {
+ break;
+ }
+ logger.info("still in for loop");
+ TimeUtil.sleepSeconds(20);
+ }
+
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
+ "Replication job should have succeeded.");
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
+ "Replication job should have succeeded.");
+
+ TimeUtil.sleepSeconds(15);
+
+ //check for exact folders to be created in ua1 : ua1/ua2 and ua1/ua3 no other should
+ // be present. both of them should have _success
+
+
+ List<String> inputFolderListForColo1 = InstanceUtil
+ .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+ 1);
+ List<String> inputFolderListForColo2 = InstanceUtil
+ .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1),
+ 1);
+
+ String outPutLocation = InstanceUtil
+ .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+ 0);
+ String outPutBaseLocation = InstanceUtil
+ .getOutputFolderBaseForInstanceForReplication(cluster1,
+ replicationCoordIDTarget.get(0), 0);
+
+ List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
+
+ Assert.assertEquals(subfolders.size(), 1);
+ Assert.assertEquals(subfolders.get(0), "ua1");
+
+ Assert.assertFalse(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutBaseLocation,
+ "_SUCCESS"));
+
+ Assert.assertTrue(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutLocation, "_SUCCESS"));
+
+ HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo1);
+ HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo2);
+
+ //sleep till late starts
+ TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 4));
+
+ //check for run id to be 1
+ Assert.assertTrue(
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) ==
+ 1
+ && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+ 0) == 1,
+ "id have to be equal 1");
+
+
+ //wait for lates run to complete
+ for (int i = 0; i < 30; i++) {
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ 0)
+ == WorkflowJob.Status.SUCCEEDED
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0)
+ == WorkflowJob.Status.SUCCEEDED) {
+ break;
+ }
+ logger.info("still in for loop");
+ TimeUtil.sleepSeconds(20);
+ }
+
+
+ TimeUtil.sleepSeconds(30);
+
+ //put data for the second time
+ HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+ + "2ndLateData", inputFolderListForColo1);
+ HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA
+ + "2ndLateData", inputFolderListForColo2);
+
+ //sleep till late 2 starts
+ TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9));
+
+ //check for run id to be 2
+ Assert.assertTrue(
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) ==
+ 2
+ && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+ 0) == 2,
+ "id have to be equal 2");
+ }
+
+ /**
+ * only difference between mixed 01 and 02 is of availability flag. feed has _success as
+ * availability flag ...so replication should not start till _success is put in ua2
+ * <p/>
+ * this test case does the following
+ * two source ua2 and ua3
+ * ua3 has follwing part data
+ * ua1/ua2
+ * ua1/ua2
+ * ua1/ua2
+ * <p/>
+ * ua2 has following part data
+ * ua1/ua3
+ * ua1/ua3
+ * ua1/ua3
+ * <p/>
+ * ua1 is the target, which in the end should have all ua1 data
+ * after first instance succeed data in put into relevant source and late should rerun
+ * after first late succeed data is put into other source and late should not rerun
+ */
+ @Test(groups = {"multiCluster"})
+ public void mixedTest02() throws Exception {
+ bundles[0].setInputFeedDataPath(inputPath);
+
+ Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+ //set availability flag as _success
+ bundles[0].setInputFeedAvailabilityFlag("_SUCCESS");
+
+ //get feed
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
+ "ua1/${cluster.colo}");
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
+ null);
+
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
+ "ua1/${cluster.colo}");
+
+ //create data in colos
+
+ String postFix = "/ua1/ua2";
+ String prefix = bundles[0].getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
+ HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua2/ua2";
+ HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua3/ua2";
+ HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix);
+
+ //put _SUCCESS in parent folder UA2
+ HadoopUtil.putFileInFolderHDFS(cluster2FS, 90, 1, prefix, "_SUCCESS");
+
+ postFix = "/ua1/ua3";
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua2/ua3";
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ postFix = "/ua3/ua3";
+ HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix);
+
+ //put _SUCCESS in parent folder of UA3
+ HadoopUtil.putFileInFolderHDFS(cluster3FS, 90, 1, prefix, "_SUCCESS");
+
+ TimeUtil.sleepSeconds(15);
+
+ //submit and schedule feed
+ logger.info("feed: " + Util.prettyPrintXml(feed));
+
+ prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+ TimeUtil.sleepSeconds(10);
+
+ //wait till 1st instance of replication coord is SUCCEEDED
+ String bundleId =
+ InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED);
+
+ List<String> replicationCoordIDTarget =
+ InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper());
+
+ for (int i = 0; i < 30; i++) {
+ if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0),
+ 0)
+ == WorkflowJob.Status.SUCCEEDED
+ && InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0)
+ == WorkflowJob.Status.SUCCEEDED) {
+ break;
+ }
+
+ logger.info("still in for loop");
+ TimeUtil.sleepSeconds(20);
+ }
+
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED,
+ "Replication job did not succeed");
+ Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1,
+ replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED,
+ "Replication job did not succeed");
+
+ TimeUtil.sleepSeconds(15);
+
+ /* check for exact folders to be created in ua1 : ua1/ua2 and ua1/ua3 no other should
+ be present. both of
+ them should have _success */
+ List<String> inputFolderListForColo1 =
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ replicationCoordIDTarget.get(0), 1);
+ List<String> inputFolderListForColo2 =
+ InstanceUtil.getInputFoldersForInstanceForReplication(cluster1,
+ replicationCoordIDTarget.get(1), 1);
+
+ String outPutLocation = InstanceUtil
+ .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0),
+ 0);
+ String outPutBaseLocation = InstanceUtil
+ .getOutputFolderBaseForInstanceForReplication(cluster1,
+ replicationCoordIDTarget.get(0), 0);
+
+ List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation);
+
+ Assert.assertEquals(subfolders.size(), 1);
+ Assert.assertEquals(subfolders.get(0), "ua1");
+
+ Assert.assertFalse(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutBaseLocation,
+ "_SUCCESS"));
+
+ Assert.assertTrue(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutLocation, "_SUCCESS"));
+
+ HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo1);
+ HadoopUtil.flattenAndPutDataInFolder(cluster3FS, OSUtil.NORMAL_INPUT,
+ inputFolderListForColo2);
+
+ //sleep till late starts
+ TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 4));
+
+ //check for run id to be 1
+ Assert.assertTrue(
+ InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) ==
+ 1
+ && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1),
+ 0) == 1,
+ "id have to be equal 1");
+ }
+}