You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:05 UTC
[06/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
new file mode 100644
index 0000000..efa5b8f
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismProcessSnSTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster1OC = serverOC.get(0);
+ OozieClient cluster2OC = serverOC.get(1);
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSnSTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismProcessSnSTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ Bundle bundle = BundleUtil.readLateDataBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testProcessSnSOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleProcess();
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ bundles[1].submitAndScheduleProcess();
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ //check if there is no criss cross
+ ServiceResponse response =
+ prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, bundles[1].getProcessData());
+ logger.info(response.getMessage());
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testProcessSnSForSubmittedProcessOnBothColos() throws Exception {
+ //schedule both bundles
+
+ bundles[0].submitProcess(true);
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ bundles[1].submitProcess(true);
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testProcessSnSForSubmittedProcessOnBothColosUsingColoHelper()
+ throws Exception {
+ //schedule both bundles
+
+ bundles[0].submitProcess(true);
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ bundles[1].submitProcess(true);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ bundles[1].submitProcess(true);
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testProcessSnSAlreadyScheduledOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleProcess();
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ bundles[1].submitAndScheduleProcess();
+
+ //now check if they have been scheduled correctly or not
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ //check if there is no criss cross
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+
+ //reschedule trial
+
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSnSSuspendedProcessOnBothColos() throws Exception {
+ //schedule both bundles
+ bundles[0].submitAndScheduleProcess();
+ bundles[1].submitAndScheduleProcess();
+
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ //now check if they have been scheduled correctly or not
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper()
+ .resume(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+ AssertUtil.assertSucceeded(cluster1.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+
+ Assert.assertEquals(OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS).size(), 1);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ }
+
+ @Test(groups = {"prism", "0.2", "embedded"})
+ public void testSnSDeletedProcessOnBothColos() throws Exception {
+ //schedule both bundles
+ final String cluster1Running = cluster1.getClusterHelper().getColoName() + "/RUNNING";
+ final String cluster2Running = cluster2.getClusterHelper().getColoName() + "/RUNNING";
+ bundles[0].submitAndScheduleProcess();
+
+ Assert.assertEquals(Util.parseResponse(
+ prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())).getMessage(),
+ cluster1Running
+ );
+
+ bundles[1].submitAndScheduleProcess();
+ Assert.assertEquals(Util.parseResponse(
+ prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())).getMessage(),
+ cluster2Running
+ );
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+
+ Assert.assertEquals(Util.parseResponse(
+ prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())
+ ).getMessage(),
+ cluster1Running
+ );
+ Assert.assertEquals(Util.parseResponse(
+ prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())
+ ).getMessage(),
+ cluster2Running
+ );
+
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testScheduleNonExistentProcessOnBothColos() throws Exception {
+ Assert.assertEquals(Util.parseResponse(cluster2.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()))
+ .getStatusCode(), 404);
+ Assert.assertEquals(Util.parseResponse(cluster1.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()))
+ .getStatusCode(), 404);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java
new file mode 100644
index 0000000..e77ae13
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+public class PrismProcessSuspendTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster1OC = serverOC.get(0);
+ OozieClient cluster2OC = serverOC.get(1);
+ private boolean restartRequired;
+ String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSuspendTest/aggregator";
+ private static final Logger logger = Logger.getLogger(PrismProcessSuspendTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ restartRequired = false;
+ Bundle bundle = BundleUtil.readLateDataBundle();
+ for (int i = 0; i < 2; i++) {
+ bundles[i] = new Bundle(bundle, servers.get(i));
+ bundles[i].generateUniqueBundle();
+ bundles[i].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (restartRequired) {
+ Util.restartService(cluster1.getProcessHelper());
+ }
+ removeBundles();
+ }
+
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendSuspendedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+ restartRequired = true;
+
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+
+ //suspend using prismHelper
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ Util.shutDownService(cluster1.getProcessHelper());
+
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+ for (int i = 0; i < 2; i++) {
+ //suspend on the other one
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil
+ .checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+ }
+ }
+
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendScheduledProcessOnBothColos() throws Exception {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+ //suspend using prismHelper
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ Assert.assertTrue(Util.parseResponse(prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())).getMessage()
+ .contains("SUSPENDED"));
+ Assert.assertTrue(Util.parseResponse(prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())).getMessage()
+ .contains("RUNNING"));
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendDeletedProcessOnBothColos() throws Exception {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+ //delete using coloHelpers
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+
+ //suspend using prismHelper
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[1].getProcessData()));
+ //suspend on the other one
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendSuspendedProcessOnBothColos() throws Exception {
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+
+ for (int i = 0; i < 2; i++) {
+ //suspend using prismHelper
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+
+ for (int i = 0; i < 2; i++) {
+ //suspend on the other one
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil
+ .checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
+ AssertUtil
+ .checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+ }
+ }
+
+ @Test(groups = "embedded")
+ public void testSuspendNonExistentProcessOnBothColos() throws Exception {
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+ AssertUtil.assertFailed(cluster1.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ }
+
+ @Test(groups = "embedded")
+ public void testSuspendSubmittedProcessOnBothColos() throws Exception {
+ bundles[0].submitProcess(true);
+ bundles[1].submitProcess(true);
+
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+ AssertUtil.assertFailed(cluster1.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendScheduledProcessOnBothColosWhen1ColoIsDown() throws Exception {
+ restartRequired = true;
+ //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+ Util.shutDownService(cluster1.getProcessHelper());
+
+ //suspend using prismHelper
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ //suspend on the other one
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void testSuspendDeletedProcessOnBothColosWhen1ColoIsDown() throws Exception {
+ restartRequired = true; //schedule using colohelpers
+ bundles[0].submitAndScheduleProcessUsingColoHelper(cluster1);
+ bundles[1].submitAndScheduleProcessUsingColoHelper(cluster2);
+
+ //delete using coloHelpers
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ Util.shutDownService(cluster1.getProcessHelper());
+
+ //suspend using prismHelper
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ //verify
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(Util.URLS.DELETE_URL, bundles[1].getProcessData()));
+ //suspend on the other one
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
+ }
+
+
+ @Test(groups = "distributed")
+ public void testSuspendNonExistentProcessOnBothColosWhen1ColoIsDown() throws Exception {
+ restartRequired = true;
+ Util.shutDownService(cluster1.getProcessHelper());
+
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ }
+
+ @Test(groups = "distributed")
+ public void testSuspendSubmittedFeedOnBothColosWhen1ColoIsDown() throws Exception {
+ restartRequired = true;
+ bundles[0].submitProcess(true);
+ bundles[1].submitProcess(true);
+
+ Util.shutDownService(cluster1.getProcessHelper());
+
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(
+ prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.assertFailed(cluster2.getProcessHelper()
+ .suspend(Util.URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
new file mode 100644
index 0000000..8156937
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
@@ -0,0 +1,602 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.ConnectException;
+import java.util.List;
+
+@Test(groups = "distributed")
+public class PrismSubmitTest extends BaseTestClass {
+
+ ColoHelper cluster1 = servers.get(0);
+ ColoHelper cluster2 = servers.get(1);
+ String baseTestDir = baseHDFSDir + "/PrismSubmitTest";
+ String randomHDFSPath = baseTestDir + "/someRandomPath";
+ String aggregateWorkflowDir = baseTestDir + "/aggregator";
+ boolean restartRequired;
+ private static final Logger logger = Logger.getLogger(PrismSubmitTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ restartRequired = false;
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster1);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (restartRequired) {
+ Util.startService(prism.getFeedHelper());
+ Util.startService(cluster1.getFeedHelper());
+ }
+ removeBundles();
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_1prism1coloPrismdown() throws Exception {
+ restartRequired = true;
+ Util.shutDownService(prism.getClusterHelper());
+
+ List<String> beforeSubmit = cluster1.getClusterHelper().getStoreInfo();
+ try {
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ } catch (ConnectException e) {
+ Assert.assertTrue(e.getMessage().contains("Connection to "
+ + prism.getClusterHelper().getHostname() + " refused"), e.getMessage());
+ }
+ List<String> afterSubmit = cluster1.getClusterHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(beforeSubmit, afterSubmit, 0);
+
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_resubmitDiffContent() throws Exception {
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+
+ bundles[0].setCLusterWorkingPath(bundles[0].getClusters().get(0), randomHDFSPath);
+ logger.info("modified cluster Data: "
+ + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_resubmitAlreadyPARTIALWithAllUp() throws Exception {
+ restartRequired = true;
+ Util.shutDownService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+ Util.startService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ }
+
+ @Test(groups = "distributed")
+ public void submitProcess_1ColoDownAfter2FeedSubmitStartAfterProcessSubmitAnsDeleteProcess()
+ throws Exception {
+ restartRequired = true;
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(1));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ Util.shutDownService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(12);
+
+
+ List<String> beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ List<String> beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(r);
+ List<String> afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ List<String> afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ List<String> afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.getProcessName(bundles[0].getProcessData()), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+ Util.startService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(15);
+
+ beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ r = prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.getProcessName(bundles[0].getProcessData()), -1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+ }
+
+ @Test(groups = "distributed")
+ public void submitProcess_ideal() throws Exception {
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ List<String> beforeSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+ List<String> beforeSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(1));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ List<String> afterSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+ List<String> afterSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+ List<String> afterSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 2);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 2);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+ beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ r = prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
+ Util.getProcessName(bundles[0].getProcessData()), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.getProcessName(bundles[0].getProcessData()), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_1prism1coloColoDown() throws Exception {
+ restartRequired = true;
+ Util.shutDownService(cluster1.getClusterHelper());
+
+ List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+
+ List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+
+ Util.startService(cluster1.getClusterHelper());
+
+ TimeUtil.sleepSeconds(10);
+
+ beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ //should be succeeded
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_1prism1coloSubmitDeleted() throws Exception {
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+
+ List<String> beforeSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ List<String> afterSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ }
+
+ @Test(groups = "embedded")
+ public void submitProcess_woClusterSubmit() throws Exception {
+ ServiceResponse r =
+ prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+
+ Assert.assertTrue(r.getMessage().contains("FAILED"));
+ Assert.assertTrue(r.getMessage().contains("is not registered"));
+ }
+
+ @Test(groups = "embedded")
+ public void submitProcess_woFeedSubmit() throws Exception {
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ r = prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+ Assert.assertTrue(r.getMessage().contains("FAILED"));
+ Assert.assertTrue(r.getMessage().contains("is not registered"));
+ }
+
+ @Test(groups = {"prism", "0.2", "distributed"})
+ public void submitCluster_resubmitAlreadyPARTIAL() throws Exception {
+ restartRequired = true;
+ bundles[1] = new Bundle(bundles[0], cluster2);
+ bundles[1].generateUniqueBundle();
+ bundles[1].setProcessWorkflow(aggregateWorkflowDir);
+
+ List<String> beforeCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforePrism = prism.getClusterHelper().getStoreInfo();
+ List<String> beforeCluster2 = cluster2.getClusterHelper().getStoreInfo();
+
+ Util.shutDownService(cluster1.getFeedHelper());
+
+ bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
+ logger.info("cluster b2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+ List<String> parCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> parPrism = prism.getClusterHelper().getStoreInfo();
+ List<String> parCluster2 = cluster2.getClusterHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(parCluster1, beforeCluster1, 0);
+ AssertUtil.compareDataStoreStates(beforePrism, parPrism,
+ Util.readEntityName(bundles[1].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeCluster2, parCluster2,
+ Util.readEntityName(bundles[1].getClusters().get(0)), 1);
+
+ Util.restartService(cluster1.getFeedHelper());
+
+ bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
+ logger.info("cluster b1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ List<String> afterCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterPrism = prism.getClusterHelper().getStoreInfo();
+ List<String> afterCluster2 = cluster2.getClusterHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(parCluster1, afterCluster1,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(afterPrism, parPrism, 0);
+ AssertUtil.compareDataStoreStates(afterCluster2, parCluster2, 0);
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_polarization() throws Exception {
+ restartRequired = true;
+ //shutdown one colo and submit
+ Util.shutDownService(cluster1.getClusterHelper());
+ List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+
+ //resubmit PARTIAL success
+ Util.startService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+ beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_resubmitDiffContentPARTIAL() throws Exception {
+ restartRequired = true;
+ Util.shutDownService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+ Util.startService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+
+ List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ bundles[0].setCLusterWorkingPath(bundles[0].getClusters().get(0), randomHDFSPath);
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+ }
+
+ @Test
+ public void submitCluster_PARTIALDeletedOfPARTIALSubmit() throws Exception {
+ restartRequired = true;
+ Util.shutDownService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+ List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ r = prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+
+ List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.readEntityName(bundles[0].getClusters().get(0)), -1);
+
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_submitPartialDeleted() throws Exception {
+ restartRequired = true;
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ TimeUtil.sleepSeconds(30);
+
+ Util.shutDownService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+
+ List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ r = prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("PARTIAL"));
+ List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.readEntityName(bundles[0].getClusters().get(0)), -1);
+
+ Util.startService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+
+ beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ }
+
+ @Test(groups = "embedded")
+ public void submitCluster_resubmitAlreadySucceeded() throws Exception {
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ List<String> beforeSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ List<String> afterSubmitCluster = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = cluster2.getClusterHelper().getStoreInfo();
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster, afterSubmitCluster, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+ }
+
+ @Test(groups = "distributed")
+ public void submitCluster_1prism1coloAllUp() throws Exception {
+ List<String> beforeSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ List<String> afterSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2,
+ Util.readEntityName(bundles[0].getClusters().get(0)), 1);
+ }
+
+ @Test(groups = "embedded")
+ public void submitCluster_1prism1coloAlreadySubmitted() throws Exception {
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ List<String> beforeSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+
+ List<String> afterSubmitCluster1 = cluster1.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitCluster2 = cluster2.getClusterHelper().getStoreInfo();
+ List<String> afterSubmitPrism = prism.getClusterHelper().getStoreInfo();
+
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+ }
+
+ @Test
+ public void submitProcess_1ColoDownAfter1FeedSubmitStartAfter2feed() throws Exception {
+ restartRequired = true;
+ ServiceResponse r =
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"), r.getMessage());
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"), r.getMessage());
+
+ Util.shutDownService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(30);
+
+ List<String> beforeSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+ List<String> beforeSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+ List<String> beforeSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getDataSets().get(1));
+ Assert.assertTrue(r.getMessage().contains("FAILED"));
+
+ List<String> afterSubmitCluster1 = cluster1.getFeedHelper().getStoreInfo();
+ List<String> afterSubmitCluster2 = cluster2.getFeedHelper().getStoreInfo();
+ List<String> afterSubmitPrism = prism.getFeedHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.readEntityName(bundles[0].getDataSets().get(1)), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+
+ Util.startService(cluster1.getClusterHelper());
+ TimeUtil.sleepSeconds(15);
+
+ beforeSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ beforeSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ beforeSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ r = prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getProcessData());
+ Assert.assertTrue(r.getMessage().contains("FAILED"), r.getMessage());
+
+ afterSubmitCluster1 = cluster1.getProcessHelper().getStoreInfo();
+ afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo();
+ afterSubmitPrism = prism.getProcessHelper().getStoreInfo();
+
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0);
+ AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism,
+ Util.getProcessName(bundles[0].getProcessData()), 1);
+ AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0);
+ }
+
+ @DataProvider(name = "errorDP")
+ public Object[][] getTestData(Method m) {
+ Object[][] testData = new Object[2][1];
+ testData[0][0] = "EmptyInputTagProcess";
+ testData[1][0] = "EmptyOutputTagProcess";
+
+ return testData;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
new file mode 100644
index 0000000..5c3cf1a
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+
+@Test(groups = "embedded")
+public class ProcessPartitionExpVariableTest extends BaseTestClass {
+ private static final Logger logger = Logger.getLogger(ProcessPartitionExpVariableTest.class);
+
+ ColoHelper cluster = servers.get(0);
+ FileSystem clusterFS = serverFS.get(0);
+ OozieClient clusterOC = serverOC.get(0);
+ private String baseTestDir = baseHDFSDir + "/ProcessPartitionExpVariableTest";
+ String aggregateWorkflowDir = baseTestDir + "/aggregator";
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ removeBundles();
+ HadoopUtil.deleteDirIfExists(baseTestDir, clusterFS);
+ }
+
+ /**
+ * Test case: set 1 optional and 1 compulsory input for process. Set partitions for each
+ * input as expression language variable linked with process properties. Check that process
+ * runs fine with partition provided for compulsory input as exp variable and succeeds in
+ * spite of nonexistent partition provided for optional input.
+ *
+ * @throws Exception
+ */
+ @Test(enabled = true)
+ public void ProcessPartitionExpVariableTest_OptionalCompulsoryPartition() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(-4);
+ String endTime = TimeUtil.getTimeWrtSystemTime(30);
+
+ bundles[0].generateRequiredBundle(1, 2, 1, baseTestDir, 1, startTime, endTime);
+ bundles[0].setProcessInputNames("inputData0", "inputData");
+ Property p = new Property();
+ p.setName("var1");
+ p.setValue("hardCoded");
+
+ bundles[0].addProcessProperty(p);
+ bundles[0].setProcessInputPartition("${var1}", "${fileTime}");
+
+ for (int i = 0; i < bundles[0].getDataSets().size(); i++)
+ logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
+
+ logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ bundles[0].submitAndScheduleBundle(prism, false);
+
+ List<String> dataDates = generateDateAndOneDayAfter(
+ TimeUtil.oozieDateToDate(TimeUtil.addMinsToTime(startTime, -25)),
+ TimeUtil.oozieDateToDate(TimeUtil.addMinsToTime(endTime, 25)), 5);
+
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, baseTestDir
+ + "/input1/", dataDates);
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC,
+ Util.getProcessName(bundles[0].getProcessData()), 2,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ }
+
+
+ /**
+ * Generates patterns of the form .../2014/03/06/21/57/2014-Mar-07 between two supplied dates.
+ * There are two dates and the second date is one day after the first one
+ *
+ * @param startDate start date
+ * @param endDate end date
+ * @param minuteSkip interval with which directories are created
+ * @return list of such dates
+ */
+ private static List<String> generateDateAndOneDayAfter(DateTime startDate, DateTime endDate,
+ int minuteSkip) {
+ final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm/");
+ final DateTimeFormatter formatter2 = DateTimeFormat.forPattern("yyyy-MMM-dd");
+ logger.info("generating data between " + formatter.print(startDate) + " and " +
+ formatter.print(endDate));
+
+ List<String> dates = new ArrayList<String>();
+ while (!startDate.isAfter(endDate)) {
+ final DateTime nextDate = startDate.plusMinutes(minuteSkip);
+ dates.add(formatter.print(nextDate) + formatter2.print(nextDate.plusDays(1)));
+ if (minuteSkip == 0) {
+ minuteSkip = 1;
+ }
+ startDate = nextDate;
+ }
+ return dates;
+ }
+
+ //TODO: ProcessPartitionExpVariableTest_OptionalPartition()
+ //TODO: ProcessPartitionExpVariableTest_CompulsoryPartition()
+ //TODO: ProcessPartitionExpVariableTest_moreThanOnceVariable()
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
new file mode 100644
index 0000000..0e2d8eb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+@Test(groups = "embedded")
+public class RescheduleKilledProcessTest extends BaseTestClass {
+
+ ColoHelper cluster = servers.get(0);
+ FileSystem clusterFS = serverFS.get(0);
+ String aggregateWorkflowDir = baseHDFSDir + "/RescheduleKilledProcessTest/aggregator";
+ private static final Logger logger = Logger.getLogger(RescheduleKilledProcessTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Run process and delete it. Submit and schedule once more.
+ *
+ * @throws Exception
+ */
+ @Test(enabled = false, timeOut = 1200000)
+ public void rescheduleKilledProcess() throws Exception {
+ String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
+ String processEndTime = TimeUtil.getTimeWrtSystemTime(6);
+ String process = bundles[0].getProcessData();
+ process = InstanceUtil.setProcessName(process, "zeroInputProcess" + new Random().nextInt());
+ List<String> feed = new ArrayList<String>();
+ feed.add(bundles[0].getOutputFeedFromBundle());
+ final ProcessMerlin processMerlin = new ProcessMerlin(process);
+ processMerlin.setProcessFeeds(feed, 0, 0, 1);
+ process = processMerlin.toString();
+
+ process = InstanceUtil.setProcessCluster(process, null,
+ XmlUtil.createProcessValidity(processStartTime, "2099-01-01T00:00Z"));
+ process = InstanceUtil
+ .setProcessCluster(process, Util.readEntityName(bundles[0].getClusters().get(0)),
+ XmlUtil.createProcessValidity(processStartTime, processEndTime));
+ bundles[0].setProcessData(process);
+
+ bundles[0].submitFeedsScheduleProcess(prism);
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+ bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL,
+ bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL,
+ bundles[0].getProcessData()));
+ }
+
+ /**
+ * Submit and schedule a process. Then remove it. Repeat all procedure twice.
+ *
+ * @throws Exception
+ */
+ @Test(enabled = true, timeOut = 1200000)
+ public void rescheduleKilledProcess02() throws Exception {
+ bundles[0].setProcessValidity(TimeUtil.getTimeWrtSystemTime(-11),
+ TimeUtil.getTimeWrtSystemTime(6));
+
+ bundles[0].setInputFeedDataPath(
+ baseHDFSDir + "/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+
+ String prefix = InstanceUtil.getFeedPrefix(bundles[0].getInputFeedFromBundle());
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 40, 1, prefix, null);
+
+ logger.info("process: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+
+ bundles[0].submitFeedsScheduleProcess(prism);
+
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+ bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL,
+ bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL,
+ bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+ bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(URLS.SUBMIT_URL,
+ bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL,
+ bundles[0].getProcessData()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
new file mode 100644
index 0000000..1e37011
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job.Status;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+
+@Test(groups = "embedded")
+public class RescheduleProcessInFinalStatesTest extends BaseTestClass {
+
+ ColoHelper cluster = servers.get(0);
+ FileSystem clusterFS = serverFS.get(0);
+ String baseTestDir = baseHDFSDir + "/RescheduleProcessInFinalStates";
+ String aggregateWorkflowDir = baseTestDir + "/aggregator";
+ String inputPath = baseTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private static final Logger logger = Logger.getLogger(RescheduleProcessInFinalStatesTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ logger.info("in @BeforeClass");
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+ Bundle b = BundleUtil.readELBundle();
+ b.generateUniqueBundle();
+ b = new Bundle(b, cluster);
+ b.setProcessWorkflow(aggregateWorkflowDir);
+
+ String startDate = "2010-01-01T20:00Z";
+ String endDate = "2010-01-03T01:04Z";
+
+ b.setInputFeedDataPath(inputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setInputFeedDataPath(inputPath);
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:15Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(
+ baseTestDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Wait till process succeed and delete it. Check that entity is absent on server. Reschedule
+ * it and check that it succeeds after some time.
+ *
+ * @throws Exception
+ */
+ @Test(enabled = true)
+ public void rescheduleSucceeded() throws Exception {
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ checkNotFoundDefinition(bundles[0].getProcessData());
+
+ //submit and schedule process again
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ }
+
+ /**
+ * Fully duplicates rescheduleSucceeded().
+ * TODO : modify test to match test case
+ * Make process run into FAILED state. Delete it and check that entity was removed.
+ * Run it again and check that process succeeds.
+ *
+ * @throws Exception
+ */
+ @Test(enabled = false)
+ public void rescheduleFailed() throws Exception {
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ checkNotFoundDefinition(bundles[0].getProcessData());
+
+ //submit and schedule process again
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ }
+
+ /**
+ * Make process got DOWN WITH ERROR state. Delete it. Check that entity is absent on the
+ * server. Reschedule it and check that it succeeds in some time.
+ * DWE mean Done With Error In Oozie
+ *
+ * @throws Exception
+ */
+ @Test(enabled = true)
+ public void rescheduleDWE() throws Exception {
+ prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z");
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.DONEWITHERROR);
+
+ //delete the process
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ checkNotFoundDefinition(bundles[0].getProcessData());
+
+ //submit and schedule process again
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ }
+
+ /**
+ * Make process run into DOWN WITH ERROR state. Delete it. Check that entity is absent on the
+ * server. Reschedule it and check that it succeeds in some time.
+ **/
+ @Test(enabled = true)
+ public void rescheduleKilled() throws Exception {
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.KILLED);
+ checkNotFoundDefinition(bundles[0].getProcessData());
+
+ //submit and schedule process again
+ AssertUtil.assertSucceeded(prism.getProcessHelper()
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ }
+
+ /**
+ * Tries to get entity definition and checks it is absent (-get definition should return
+ * process not found)
+ *
+ * @param process process entity definition
+ * @throws URISyntaxException
+ * @throws IOException
+ * @throws AuthenticationException
+ * @throws JAXBException
+ */
+ private void checkNotFoundDefinition(String process)
+ throws URISyntaxException, IOException, AuthenticationException, JAXBException {
+ ServiceResponse r = prism.getProcessHelper()
+ .getEntityDefinition(URLS.GET_ENTITY_DEFINITION, process);
+ Assert.assertTrue(r.getMessage().contains("(process) not found"));
+ AssertUtil.assertFailed(r);
+ }
+}