You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:07 UTC
[08/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
new file mode 100644
index 0000000..e9a9cf4
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.APIResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismFeedSnSTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster1OC = serverOC.get(0);
+ OozieClient cluster2OC = serverOC.get(1);
+ private boolean restartRequired;
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSnSTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismFeedSnSTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ restartRequired = false;
+ Bundle bundle = BundleUtil.readELBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (restartRequired) {
+ Util.restartService(cluster1.getFeedHelper());
+ }
+ removeBundles();
+ }
+
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testFeedSnSOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleFeed();
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ bundles[1].submitAndScheduleFeed();
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testSnSAlreadyScheduledFeedOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ //ensure only one bundle is there
+ Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
+ Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
+ Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ }
+
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSnSSuspendedFeedOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ //now check if they have been scheduled correctly or not
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
+ Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+
+ AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+ .resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+ Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
+ Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
+ AssertUtil.assertSucceeded(cluster2.getFeedHelper()
+ .resume(URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testSnSDeletedFeedOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testScheduleNonExistentFeedOnBothColos() throws Exception {
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testFeedSnSOn1ColoWhileOtherColoIsDown() throws Exception {
+ restartRequired = true;
+ for (String cluster : bundles[1].getClusters()) {
+ AssertUtil
+ .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
+ }
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ }
+
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testFeedSnSOn1ColoWhileThatColoIsDown() throws Exception {
+ restartRequired = true;
+ bundles[0].submitFeed();
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeed() throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ bundles[1].submitAndScheduleFeed();
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeed() throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ bundles[1].submitAndScheduleFeed();
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testFeedSnSOnBothColosUsingColoHelper() throws Exception {
+ //schedule both bundles
+ bundles[0].submitFeed();
+ APIResult result = Util.parseResponse((cluster1.getFeedHelper()
+ .submitEntity(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0))));
+ Assert.assertEquals(result.getStatusCode(), 404);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ bundles[1].submitFeed();
+ result = Util.parseResponse(cluster2.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ Assert.assertEquals(result.getStatusCode(), 404);
+
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ }
+
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSnSSuspendedFeedOnBothColosUsingColoHelper() throws Exception {
+
+ //schedule both bundles
+ bundles[0].submitFeed();
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ bundles[1].submitFeed();
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+ AssertUtil.assertSucceeded(cluster1.getFeedHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ //now check if they have been scheduled correctly or not
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.assertSucceeded(
+ cluster1.getFeedHelper().resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+
+ AssertUtil.assertSucceeded(cluster2.getFeedHelper().suspend(URLS.SUSPEND_URL,
+ bundles[1].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ }
+
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testScheduleDeletedFeedOnBothColosUsingColoHelper() throws Exception {
+
+ //schedule both bundles
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+ Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
+ .getStatus(URLS.STATUS_URL, bundles[0].getDataSets().get(0))).getMessage(),
+ cluster1.getClusterHelper().getColoName() + "/RUNNING");
+
+ Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
+ .getStatus(URLS.STATUS_URL, bundles[1].getDataSets().get(0))).getMessage(),
+ cluster2.getClusterHelper().getColoName() + "/RUNNING");
+ }
+
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSNSNonExistentFeedOnBothColosUsingColoHelper() throws Exception {
+
+ Assert.assertEquals(Util.parseResponse(cluster1.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)))
+ .getStatusCode(), 404);
+ Assert.assertEquals(Util.parseResponse(cluster2.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)))
+ .getStatusCode(), 404);
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testFeedSnSOn1ColoWhileOtherColoIsDownUsingColoHelper() throws Exception {
+ restartRequired = true;
+ for (String cluster : bundles[1].getClusters()) {
+ AssertUtil
+ .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
+ }
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ }
+
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testFeedSnSOn1ColoWhileThatColoIsDownUsingColoHelper() throws Exception {
+ restartRequired = true;
+
+ bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
+ logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
+ logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ String startTimeUA1 = "2012-10-01T12:00Z";
+ String startTimeUA2 = "2012-10-01T12:00Z";
+
+ String feed = bundles[0].getDataSets().get(0);
+ feed = InstanceUtil.setFeedCluster(feed,
+ XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+ "${cluster.colo}",
+ baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+ feed = InstanceUtil
+ .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
+ XmlUtil.createRtention("days(10000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ baseHDFSDir +
+ "/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+ logger.info("feed: " + Util.prettyPrintXml(feed));
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ ServiceResponse response = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+ AssertUtil.assertPartial(response);
+ response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+ AssertUtil.assertPartial(response);
+ Util.startService(cluster1.getFeedHelper());
+ prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+ prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[1].getClusters().get(0));
+
+ }
+
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeedUsingColoHelper()
+ throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ AssertUtil.assertSucceeded(
+ cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+
+ bundles[1].submitAndScheduleFeed();
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeedUsingColoHelper() throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ bundles[1].submitAndScheduleFeed();
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java
new file mode 100644
index 0000000..d6dbaa5
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.TestNGException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismFeedSuspendTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster1OC = serverOC.get(0);
+ OozieClient cluster2OC = serverOC.get(1);
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSuspendTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismFeedSuspendTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ Bundle bundle = BundleUtil.readELBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Run two feed on different clusters. Delete 1st feed and try to suspend it. Should fail.
+ * Check that 2nd feed is running on 2nd cluster. Delete it and try to suspend it too.
+ * Attempt should fail and both feeds should be killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testSuspendDeletedFeedOnBothColos() throws Exception {
+ bundles[0].submitAndScheduleFeed();
+ bundles[1].submitAndScheduleFeed();
+
+ //delete using prism
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ //suspend using prism
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+ //suspend on the other one
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+ }
+
+ /**
+ * Run two feeds on different clusters. Suspend feed and try to suspend it once more. Check
+ * that action succeeds and feed is suspended. Make the same for 2nd feed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendSuspendedFeedOnBothColos() throws Exception {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+ for (int i = 0; i < 2; i++) {
+ //suspend using prism
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+ );
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+ }
+ for (int i = 0; i < 2; i++) {
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0))
+ );
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+ }
+ }
+
+ /**
+ * Attempt to suspend nonexistent feed should fail through both prism and matching server.
+ *
+ * @throws Exception
+ */
+ @Test(groups = "embedded")
+ public void testSuspendNonExistentFeedOnBothColos() throws Exception {
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+ AssertUtil.assertFailed(cluster1.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertFailed(cluster2.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ }
+
+ /**
+ * Attempt to suspend non-running feed should fail through both prism and matching server.
+ *
+ * @throws Exception
+ */
+ @Test(groups = "embedded")
+ public void testSuspendSubmittedFeedOnBothColos() throws Exception {
+ bundles[0].submitFeed();
+ bundles[1].submitFeed();
+
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertFailed(prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+ AssertUtil.assertFailed(cluster1.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertFailed(cluster2.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ }
+
+ /**
+ * Run two feeds on different clusters. Stop server on 1st cluster. Attempt to suspend feed on
+ * stopped server through prism should fail. Check that 2nd feed is running. Suspend it
+ * and check that it is suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendScheduledFeedOnBothColosWhen1ColoIsDown() throws Exception {
+ try {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ //suspend using prism
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+ );
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster1.getFeedHelper());
+ }
+ }
+
+ /**
+ * Run two feeds on different clusters. Delete 1st feed. Stop server on 1st cluster. Attempt
+ * to suspend deleted feed on stopped server should fail. Delete 2nd feed. Attempt
+ * to suspend deleted 2nd feed should also fail. Check that both feeds are killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendDeletedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+ try {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+ //delete using coloHelpers
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0))
+ );
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ //suspend using prism
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper()
+ .delete(Util.URLS.DELETE_URL, bundles[1].getDataSets().get(0))
+ );
+ //suspend on the other one
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0))
+ );
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster1.getFeedHelper());
+ }
+ }
+
+ /**
+ * Run two feeds on different clusters. Suspend 1st feed and check that it suspended,
+ * and then stop server on its cluster. Attempt to suspend the same feed again should fail.
+ * Suspend 2nd feed and check that both feeds are suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendSuspendedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+ try {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleFeedUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleFeedUsingColoHelper(cluster2);
+
+ //suspend using prism
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+ );
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+ );
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster1.getProcessHelper());
+ }
+ }
+
+ /**
+ * Stop the 1st server. Attempt to suspend nonexistent feeds on both clusters should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendNonExistentFeedOnBothColosWhen1ColoIsDown()
+ throws Exception {
+ try {
+ Util.shutDownService(cluster1.getFeedHelper());
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0))
+ );
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertFailed(
+ cluster2.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster1.getProcessHelper());
+ }
+ }
+
+ /**
+ * Submit two feeds. Stop the server on the 1st cluster. Attempts to suspend non-scheduled
+ * feeds on both clusters should fail through prism as well as through colohelper.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendSubmittedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+ try {
+ bundles[0].submitFeed();
+ bundles[1].submitFeed();
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getDataSets().get(0))
+ );
+ AssertUtil.assertFailed(
+ prism.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+
+ AssertUtil.assertFailed(
+ cluster2.getFeedHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getCause());
+ } finally {
+ Util.restartService(cluster1.getProcessHelper());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
new file mode 100644
index 0000000..b6bf6d6
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+@Test(groups = "embedded")
+public class PrismFeedUpdateTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ FileSystem server1FS = serverFS.get(0);
+ OozieClient OC1 = serverOC.get(0);
+ String baseTestDir = baseHDFSDir + "/PrismFeedUpdateTest";
+ String aggregateWorkflowDir = baseTestDir + "/aggregator";
+ public final String cluster1colo = cluster1.getClusterHelper().getColoName();
+ public final String cluster2colo = cluster2.getClusterHelper().getColoName();
+ private static final Logger logger = Logger.getLogger(PrismFeedUpdateTest.class);
+ String feedInputTimedOutPath =
+ baseTestDir + "/timedout/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ Bundle bundle = BundleUtil.readELBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[i].setInputFeedDataPath(feedInputTimedOutPath);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * TODO : complete test case
+ */
+ @Test(enabled = true, timeOut = 1200000)
+ public void updateFeedQueue_dependentMultipleProcess_oneProcessZeroInput() throws Exception {
+ //cluster1colo and cluster2colo are source. feed01 on cluster1colo target cluster2colo,
+ // feed02 on cluster2colo target cluster1colo
+
+ //get 3 unique bundles
+ //set cluster colos
+ bundles[0].setCLusterColo(cluster1colo);
+ logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+
+ bundles[1].setCLusterColo(cluster2colo);
+ logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+
+ //submit 3 clusters
+
+ //get 2 unique feeds
+ String feed01 = bundles[0].getInputFeedFromBundle();
+ String outputFeed = bundles[0].getOutputFeedFromBundle();
+
+ //set source and target for the 2 feeds
+
+ //set clusters to null;
+ feed01 = InstanceUtil
+ .setFeedCluster(feed01,
+ XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+ outputFeed = InstanceUtil
+ .setFeedCluster(outputFeed,
+ XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
+ ClusterType.SOURCE, null);
+
+
+ //set new feed input data
+ feed01 = Util.setFeedPathValue(feed01,
+ baseTestDir + "/feed01/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
+
+
+ //generate data in both the colos cluster1colo and cluster2colo
+ String prefix = InstanceUtil.getFeedPrefix(feed01);
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS);
+ HadoopUtil.lateDataReplenish(server1FS, 70, 1, prefix, null);
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(-50);
+
+ //set clusters for feed01
+ feed01 = InstanceUtil
+ .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
+ null);
+ feed01 = InstanceUtil
+ .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
+ null);
+
+ //set clusters for output feed
+ outputFeed = InstanceUtil.setFeedCluster(outputFeed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null);
+ outputFeed = InstanceUtil.setFeedCluster(outputFeed,
+ XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
+ XmlUtil.createRtention("hours(10)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null);
+
+
+ //submit and schedule feeds
+ logger.info("feed01: " + Util.prettyPrintXml(feed01));
+ logger.info("outputFeed: " + Util.prettyPrintXml(outputFeed));
+
+ //create 2 process with 2 clusters
+
+ //get first process
+ String process01 = bundles[0].getProcessData();
+
+ //add clusters to process
+
+ String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
+ String processEndTime = TimeUtil.getTimeWrtSystemTime(70);
+
+
+ process01 = InstanceUtil
+ .setProcessCluster(process01, null,
+ XmlUtil.createProcessValidity(startTime, "2099-01-01T00:00Z"));
+ process01 = InstanceUtil
+ .setProcessCluster(process01, Util.readEntityName(bundles[0].getClusters().get(0)),
+ XmlUtil.createProcessValidity(processStartTime, processEndTime));
+ process01 = InstanceUtil
+ .setProcessCluster(process01, Util.readEntityName(bundles[1].getClusters().get(0)),
+ XmlUtil.createProcessValidity(processStartTime, processEndTime));
+
+ //get 2nd process :
+ String process02 = process01;
+ process02 = InstanceUtil
+ .setProcessName(process02, "zeroInputProcess" + new Random().nextInt());
+ List<String> feed = new ArrayList<String>();
+ feed.add(outputFeed);
+ final ProcessMerlin processMerlin = new ProcessMerlin(process02);
+ processMerlin.setProcessFeeds(feed, 0, 0, 1);
+ process02 = processMerlin.toString();
+
+
+ //submit and schedule both process
+ logger.info("process: " + Util.prettyPrintXml(process01));
+ logger.info("process: " + Util.prettyPrintXml(process02));
+
+
+ logger.info("Wait till process goes into running ");
+
+ //change feed location path
+ outputFeed = Util.setFeedProperty(outputFeed, "queueName", "myQueue");
+
+ logger.info("updated feed: " + Util.prettyPrintXml(outputFeed));
+
+ //update feed first time
+ prism.getFeedHelper().update(outputFeed, outputFeed);
+ }
+
+
+ /**
+ * schedules a feed and dependent process. Process start and end are in past
+ * Test for bug https://issues.apache.org/jira/browse/FALCON-500
+ */
+ @Test
+ public void dependentProcessSucceeded()
+ throws Exception {
+ bundles[0].setProcessValidity("2014-06-01T04:00Z","2014-06-01T04:02Z");
+ bundles[0].submitAndScheduleAllFeeds();
+ bundles[0].submitAndScheduleProcess();
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+ OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+ 0, 0);
+ InstanceUtil.waitForBundleToReachState(cluster1, bundles[0].getProcessName(),
+ Job.Status.SUCCEEDED, 20);
+
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.addProperty("someProp","someVal");
+ AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
+ //check for new feed bundle creation
+ Assert.assertEquals(OozieUtil.getNumberOfBundle(prism, EntityType.FEED,
+ feed.getName()),2);
+ Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
+ bundles[0].getProcessName()),1);
+ }
+
+ /**
+ * schedules a feed and dependent process. Update availability flag and check for process update
+ * Test for bug https://issues.apache.org/jira/browse/FALCON-278
+ */
+ @Test
+ public void updateAvailabilityFlag()
+ throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+ String endTime = TimeUtil.getTimeWrtSystemTime(30);
+ bundles[0].setProcessValidity(startTime, endTime);
+ bundles[0].submitAndScheduleAllFeeds();
+ bundles[0].submitAndScheduleProcess();
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
+ OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+ 0, 0);
+
+ FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+ feed.setAvailabilityFlag("mytestflag");
+ AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
+ //check for new feed bundle creation
+ Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.FEED,
+ feed.getName()),2);
+ Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
+ bundles[0].getProcessName()),2);
+ }
+
+}