You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:11 UTC
[12/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
new file mode 100644
index 0000000..f9f37cb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -0,0 +1,1713 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.regression.prism;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.process.ExecutionType;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.APIResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.supportClasses.HadoopFileEditor;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Minutes;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * test for process update.
+ */
+@Test(groups = "distributed")
+public class NewPrismProcessUpdateTest extends BaseTestClass {
+
+ private String baseTestDir = baseHDFSDir + "/NewPrismProcessUpdateTest";
+ private String inputFeedPath = baseTestDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String workflowPath = baseTestDir + "/falcon-oozie-wf";
+ private String workflowPath2 = baseTestDir + "/falcon-oozie-wf2";
+ private String aggregatorPath = baseTestDir + "/aggregator";
+ private String aggregator1Path = baseTestDir + "/aggregator1";
+ private ColoHelper cluster1 = servers.get(0);
+ private ColoHelper cluster2 = servers.get(1);
+ private ColoHelper cluster3 = servers.get(2);
+ private FileSystem cluster1FS = serverFS.get(0);
+ private OozieClient cluster2OC = serverOC.get(1);
+ private OozieClient cluster3OC = serverOC.get(2);
+ private static final Logger LOGGER = Logger.getLogger(NewPrismProcessUpdateTest.class);
+
+ @BeforeMethod(alwaysRun = true)
+ public void testSetup(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+ Bundle b = BundleUtil.readUpdateBundle();
+ bundles[0] = new Bundle(b, cluster1);
+ bundles[1] = new Bundle(b, cluster2);
+ bundles[2] = new Bundle(b, cluster3);
+ setBundleWFPath(bundles[0], bundles[1], bundles[2]);
+ bundles[1].addClusterToBundle(bundles[2].getClusters().get(0),
+ ClusterType.TARGET, null, null);
+ usualGrind(cluster3, bundles[1]);
+ Util.restartService(cluster3.getClusterHelper());
+ }
+
+ @BeforeClass(alwaysRun = true)
+ public void setup() throws Exception {
+ for (String wfPath : new String[] { workflowPath, workflowPath2, aggregatorPath, aggregator1Path }) {
+ uploadDirToClusters(wfPath, OSUtil.RESOURCES_OOZIE);
+ }
+ Util.restartService(cluster3.getClusterHelper());
+ Util.restartService(cluster1.getClusterHelper());
+ Util.restartService(cluster2.getClusterHelper());
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessFrequencyInEachColoWithOneProcessRunningMonthly()
+ throws Exception {
+ final String startTIme = TimeUtil.getTimeWrtSystemTime(-20);
+ String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60);
+ bundles[1].setProcessPeriodicity(1, TimeUnit.months);
+ bundles[1].setOutputFeedPeriodicity(1, TimeUnit.months);
+ bundles[1].setProcessValidity(startTIme, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ String updatedProcess = InstanceUtil
+ .setProcessFrequency(bundles[1].getProcessData(),
+ new Frequency("" + 5, TimeUnit.minutes));
+
+ LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess));
+
+ //now to update
+ while (Util
+ .parseResponse(prism.getProcessHelper()
+ .update((bundles[1].getProcessData()), updatedProcess))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("update didn't SUCCEED in last attempt");
+ TimeUtil.sleepSeconds(10);
+ }
+
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+ Util.getProcessObject(updatedProcess).getFrequency());
+ TimeUtil.sleepSeconds(60);
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, false);
+ waitingForBundleFinish(cluster3, oldBundleId, 5);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ //failing due to falcon bug : https://issues.apache.org/jira/browse/FALCON-458
+ public void updateProcessRollStartTimeForwardInEachColoWithOneProcessRunning()
+ throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+ String endTime = TimeUtil.getTimeWrtSystemTime(7);
+ bundles[1].setProcessValidity(startTime, endTime);
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ List<String> oldNominalTimes =
+ OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+
+ String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()
+ ), 20);
+ String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()
+ ), 25);
+
+ bundles[1].setProcessValidity(newStartTime, newEndTime);
+ bundles[1].setProcessConcurrency(10);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ LOGGER.info("updated process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+ while (Util.parseResponse(
+ prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), Util.prettyPrintXml(bundles[1]
+ .getProcessData())))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("update didn't SUCCEED in last attempt");
+ TimeUtil.sleepSeconds(10);
+ }
+
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, false);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+
+ OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
+ bundles[1].getProcessName(), 0);
+ waitingForBundleFinish(cluster3, oldBundleId, 15);
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ int finalNumberOfInstances =
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+ Assert.assertEquals(finalNumberOfInstances,
+ getExpectedNumberOfWorkflowInstances(TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd())));
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ int expectedNumberOfWorkflows =
+ getExpectedNumberOfWorkflowInstances(newStartTime, TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getEnd()));
+ Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+ expectedNumberOfWorkflows);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1800000)
+ public void updateProcessConcurrencyWorkflowExecutionInEachColoWithOneColoDown()
+ throws Exception {
+ //bundles[1].generateUniqueBundle();
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ TimeUtil.sleepSeconds(25);
+
+ int initialConcurrency = bundles[1].getProcessObject().getParallel();
+
+ bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
+ bundles[1].setProcessWorkflow(workflowPath2);
+ bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
+
+ //stop cluster 3 where process is scheduled
+ Util.shutDownService(cluster3.getProcessHelper());
+
+ //now to update
+ AssertUtil.assertPartial(
+ prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+ initialConcurrency);
+ Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+ workflowPath);
+ Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+ bundles[1].getProcessObject().getOrder());
+
+ String coloString = getResponse(cluster2, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(coloString).getWorkflow().getPath(),
+ workflowPath2);
+
+ Util.startService(cluster3.getProcessHelper());
+ dualComparisonFailure(prism, cluster2, bundles[1].getProcessData());
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ AssertUtil
+ .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ while (Util.parseResponse(
+ prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData()))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("WARNING: update did not succeed, retrying ");
+ TimeUtil.sleepSeconds(20);
+ }
+ prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+ initialConcurrency + 3);
+ Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+ workflowPath2);
+ Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+ bundles[1].getProcessObject().getOrder());
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ AssertUtil
+ .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ waitingForBundleFinish(cluster3, oldBundleId);
+ int finalNumberOfInstances =
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+ int expectedInstances =
+ getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd()));
+ Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+ "number of instances doesnt match :(");
+
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessFrequencyInEachColoWithOneProcessRunning() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(-2);
+ String endTime = TimeUtil.getTimeWrtSystemTime(20);
+ bundles[1].setProcessValidity(startTime, endTime);
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+ LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+ String updatedProcess = InstanceUtil
+ .setProcessFrequency(bundles[1].getProcessData(),
+ new Frequency("" + 7, TimeUnit.minutes));
+
+ LOGGER.info("updated process: " + updatedProcess);
+
+ //now to update
+
+ ServiceResponse response =
+ prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ AssertUtil.assertSucceeded(response);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, false);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+ Util.getProcessObject(updatedProcess).getFrequency());
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ waitingForBundleFinish(cluster3, oldBundleId);
+
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessNameInEachColoWithOneProcessRunning() throws Exception {
+ //bundles[1].generateUniqueBundle();
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ String originalProcessData = bundles[1].getProcessData();
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ TimeUtil.sleepSeconds(20);
+ List<String> oldNominalTimes =
+ OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+ bundles[1].setProcessName("myNewProcessName");
+
+ //now to update
+ ServiceResponse response =
+ prism.getProcessHelper()
+ .update((bundles[1].getProcessData()), bundles[1].getProcessData());
+ AssertUtil.assertFailed(response);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ originalProcessData, false, false);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessConcurrencyInEachColoWithOneProcessRunning()
+ throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(-2);
+ String endTime = TimeUtil.getTimeWrtSystemTime(10);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ //bundles[1].generateUniqueBundle();
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ //now to update
+ DateTime updateTime = new DateTime(DateTimeZone.UTC);
+ TimeUtil.sleepSeconds(60);
+ List<String> oldNominalTimes =
+ OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+ LOGGER.info("updating at " + updateTime);
+ while (Util
+ .parseResponse(updateProcessConcurrency(bundles[1],
+ bundles[1].getProcessObject().getParallel() + 3))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("WARNING: update did not scceed, retyring ");
+ TimeUtil.sleepSeconds(20);
+ }
+
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+ bundles[1].getProcessObject().getParallel() + 3);
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated
+ // correctly.
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(),
+ false, true);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ // future : should be verified using cord xml
+ Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ boolean doesExist = false;
+ while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
+ &&
+ status != Job.Status.DONEWITHERROR) {
+ int statusCount = InstanceUtil
+ .getInstanceCountWithStatus(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()),
+ org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+ EntityType.PROCESS);
+ if (statusCount == bundles[1].getProcessObject().getParallel() + 3) {
+ doesExist = true;
+ break;
+ }
+ status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ Assert.assertNotNull(status,
+ "status must not be null!");
+ TimeUtil.sleepSeconds(30);
+ }
+
+ Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
+ int expectedNumberOfInstances =
+ getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd()));
+ Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+ expectedNumberOfInstances);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessIncreaseValidityInEachColoWithOneProcessRunning() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+ String endTime = TimeUtil.getTimeWrtSystemTime(8);
+ bundles[1].setProcessValidity(startTime, endTime);
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getEnd()
+ ), 4);
+ bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()),
+ newEndTime);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ ServiceResponse response = prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData());
+ for (int i = 0; i < 10
+ &&
+ Util.parseResponse(response).getStatus() != APIResult.Status.SUCCEEDED; ++i) {
+ response = prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData());
+ TimeUtil.sleepSeconds(6);
+ }
+ Assert.assertEquals(Util.parseResponse(response).getStatus(),
+ APIResult.Status.SUCCEEDED, "Process update did not succeed.");
+
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), false, true);
+
+ int i = 0;
+
+ while (OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId)
+ != getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()
+ ), TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity()
+ .getEnd()))
+ && i < 10) {
+ TimeUtil.sleepSeconds(1);
+ i++;
+ }
+
+ bundles[1].verifyDependencyListing(cluster2);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
+ bundles[1].getProcessName(), 0);
+ waitingForBundleFinish(cluster3, oldBundleId);
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated
+ // correctly.
+ int finalNumberOfInstances = InstanceUtil
+ .getProcessInstanceList(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
+ .size();
+ Assert.assertEquals(finalNumberOfInstances,
+ getExpectedNumberOfWorkflowInstances(TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd())));
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessConcurrencyInEachColoWithOneProcessSuspended()
+ throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+ String endTime = TimeUtil.getTimeWrtSystemTime(7);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ //now to update
+ while (Util
+ .parseResponse(updateProcessConcurrency(bundles[1],
+ bundles[1].getProcessObject().getParallel() + 3))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("WARNING: update did not scceed, retyring ");
+ TimeUtil.sleepSeconds(20);
+ }
+
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+ bundles[1].getProcessObject().getParallel() + 3);
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), false, true);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+ .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ boolean doesExist = false;
+ while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
+ &&
+ status != Job.Status.DONEWITHERROR) {
+ if (InstanceUtil
+ .getInstanceCountWithStatus(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()),
+ org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+ EntityType.PROCESS)
+ ==
+ bundles[1].getProcessObject().getParallel()) {
+ doesExist = true;
+ break;
+ }
+ status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ }
+
+ Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
+
+ OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
+ bundles[1].getProcessName(), 0);
+
+ waitingForBundleFinish(cluster3, oldBundleId);
+
+ int finalNumberOfInstances =
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+ int expectedInstances =
+ getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd()));
+
+ Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+ "number of instances doesnt match :(");
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessConcurrencyInEachColoWithOneColoDown() throws Exception {
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+ String endTime = TimeUtil.getTimeWrtSystemTime(5);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+
+ LOGGER.info("process to be scheduled: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ //now to update
+ Util.shutDownService(cluster3.getClusterHelper());
+
+ ServiceResponse response =
+ updateProcessConcurrency(bundles[1],
+ bundles[1].getProcessObject().getParallel() + 3);
+ AssertUtil.assertPartial(response);
+
+ Util.startService(cluster3.getClusterHelper());
+
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+ bundles[1].getProcessObject().getParallel());
+
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
+ Job.Status.RUNNING);
+
+ while (Util
+ .parseResponse(updateProcessConcurrency(bundles[1],
+ bundles[1].getProcessObject().getParallel() + 3))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("WARNING: update did not scceed, retyring ");
+ TimeUtil.sleepSeconds(20);
+ }
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ dualComparison(prism, cluster2, bundles[1].getProcessData());
+
+ Job.Status status =
+ OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ boolean doesExist = false;
+ while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
+ &&
+ status != Job.Status.DONEWITHERROR) {
+ if (InstanceUtil
+ .getInstanceCountWithStatus(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()),
+ org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+ EntityType.PROCESS)
+ ==
+ bundles[1].getProcessObject().getParallel() + 3) {
+ doesExist = true;
+ break;
+ }
+ status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ TimeUtil.sleepSeconds(30);
+ }
+ Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
+ OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
+ oldNominalTimes, Util.readEntityName(bundles[1].getProcessData()), false,
+ true
+ );
+
+ waitingForBundleFinish(cluster3, oldBundleId);
+
+ int finalNumberOfInstances =
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+ int expectedInstances =
+ getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd()));
+ Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+ "number of instances doesnt match :(");
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessRunning()
+ throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+ String endTime = TimeUtil.getTimeWrtSystemTime(7);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ int initialConcurrency = bundles[1].getProcessObject().getParallel();
+
+ bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
+ bundles[1].setProcessWorkflow(aggregator1Path);
+ bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
+
+ //now to update
+
+ String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString();
+
+ LOGGER.info("updating @ " + updateTime);
+
+ while (Util.parseResponse(
+ prism.getProcessHelper().update((bundles[1].getProcessData()), bundles[1]
+ .getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) {
+ TimeUtil.sleepSeconds(10);
+ }
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+ initialConcurrency + 3);
+ Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+ aggregator1Path);
+ Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+ bundles[1].getProcessObject().getOrder());
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ waitingForBundleFinish(cluster3, oldBundleId);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ int finalNumberOfInstances =
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+ int expectedInstances =
+ getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd()));
+ Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+ "number of instances doesnt match :(");
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessSuspended()
+ throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(2);
+ String endTime = TimeUtil.getTimeWrtSystemTime(6);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ int initialConcurrency = bundles[1].getProcessObject().getParallel();
+
+ bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
+ bundles[1].setProcessWorkflow(aggregator1Path);
+ bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
+ //suspend
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+
+ //now to update
+ String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString();
+ LOGGER.info("updating @ " + updateTime);
+ while (Util.parseResponse(
+ prism.getProcessHelper()
+ .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ TimeUtil.sleepSeconds(10);
+ }
+
+ AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+ .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+
+ String prismString = getResponse(prism, bundles[1].getProcessData(), true);
+ Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
+ initialConcurrency + 3);
+ Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
+ aggregator1Path);
+ Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
+ bundles[1].getProcessObject().getOrder());
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ waitingForBundleFinish(cluster3, oldBundleId);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ int finalNumberOfInstances =
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+
+ int expectedInstances =
+ getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ TimeUtil
+ .dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters()
+ .get(0).getValidity()
+ .getEnd()));
+ Assert.assertEquals(finalNumberOfInstances, expectedInstances,
+ "number of instances doesnt match :(");
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessAddNewInputInEachColoWithOneProcessRunning() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+ String endTime = TimeUtil.getTimeWrtSystemTime(6);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ TimeUtil.sleepSeconds(20);
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
+ String inputFeed = bundles[1].getInputFeedFromBundle();
+
+ bundles[1].addProcessInput(newFeedName, "inputData2");
+ inputFeed = Util.setFeedName(inputFeed, newFeedName);
+
+ LOGGER.info(inputFeed);
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, inputFeed));
+
+ while (Util.parseResponse(
+ prism.getProcessHelper()
+ .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ TimeUtil.sleepSeconds(20);
+ }
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, false);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+ bundles[1].verifyDependencyListing(cluster2);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ waitingForBundleFinish(cluster3, oldBundleId);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessAddNewInputInEachColoWithOneProcessSuspended() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(2);
+ String endTime = TimeUtil.getTimeWrtSystemTime(6);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
+ String inputFeed = bundles[1].getInputFeedFromBundle();
+
+ bundles[1].addProcessInput(newFeedName, "inputData2");
+ inputFeed = Util.setFeedName(inputFeed, newFeedName);
+
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, inputFeed));
+
+ while (Util.parseResponse(
+ prism.getProcessHelper()
+ .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ TimeUtil.sleepSeconds(10);
+ }
+
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, false);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+ AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+ .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+
+ bundles[1].verifyDependencyListing(cluster2);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ waitingForBundleFinish(cluster3, oldBundleId);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessAddNewInputInEachColoWithOneColoDown() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(3);
+ String endTime = TimeUtil.getTimeWrtSystemTime(10);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ String originalProcess = bundles[1].getProcessData();
+ String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
+ String inputFeed = bundles[1].getInputFeedFromBundle();
+ bundles[1].addProcessInput(newFeedName, "inputData2");
+ inputFeed = Util.setFeedName(inputFeed, newFeedName);
+ String updatedProcess = bundles[1].getProcessData();
+
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, originalProcess));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(originalProcess), EntityType.PROCESS);
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ //submit new feed
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, inputFeed));
+
+ Util.shutDownService(cluster3.getProcessHelper());
+
+ AssertUtil.assertPartial(
+ prism.getProcessHelper()
+ .update(updatedProcess, updatedProcess));
+
+ Util.startService(cluster3.getProcessHelper());
+ bundles[1].verifyDependencyListing(cluster2);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ Assert.assertFalse(Util.isDefinitionSame(cluster2, prism, originalProcess));
+
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), false, false);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
+ Job.Status.RUNNING);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ while (Util.parseResponse(
+ prism.getProcessHelper().update(updatedProcess, updatedProcess)).getStatus()
+ != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("update didnt SUCCEED in last attempt");
+ TimeUtil.sleepSeconds(10);
+ }
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ Assert.assertTrue(Util.isDefinitionSame(cluster2, prism, originalProcess));
+ bundles[1].verifyDependencyListing(cluster2);
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ updatedProcess, true, false);
+ waitingForBundleFinish(cluster3, oldBundleId);
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
+ Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessDecreaseValidityInEachColoWithOneProcessRunning() throws Exception {
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getEnd()
+ ), -2);
+ bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()),
+ newEndTime);
+ while (Util.parseResponse(
+ (prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData())))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("update didnt SUCCEED in last attempt");
+ TimeUtil.sleepSeconds(10);
+ }
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), false, true);
+
+ bundles[1].verifyDependencyListing(cluster2);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ waitingForBundleFinish(cluster3, oldBundleId);
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ int finalNumberOfInstances = InstanceUtil
+ .getProcessInstanceList(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
+ .size();
+ Assert.assertEquals(finalNumberOfInstances,
+ getExpectedNumberOfWorkflowInstances(bundles[1]
+ .getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart(),
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getEnd()));
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ int expectedNumberOfWorkflows =
+ getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getStart()),
+ newEndTime);
+ Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
+ expectedNumberOfWorkflows);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessIncreaseValidityInEachColoWithOneProcessSuspended() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(-1);
+ String endTime = TimeUtil.getTimeWrtSystemTime(3);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ TimeUtil.sleepSeconds(30);
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getEnd()
+ ), 4);
+ bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()),
+ newEndTime);
+
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ while (Util.parseResponse(
+ prism.getProcessHelper()
+ .update((bundles[1].getProcessData()), bundles[1].getProcessData()))
+ .getStatus() != APIResult.Status.SUCCEEDED) {
+ LOGGER.info("update didnt SUCCEED in last attempt");
+ TimeUtil.sleepSeconds(10);
+ }
+ AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+ .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+
+ dualComparison(prism, cluster2, bundles[1].getProcessData());
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ waitingForBundleFinish(cluster3, oldBundleId);
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ int finalNumberOfInstances = InstanceUtil
+ .getProcessInstanceList(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
+ .size();
+ Assert.assertEquals(finalNumberOfInstances,
+ getExpectedNumberOfWorkflowInstances(bundles[1]
+ .getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart(),
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getEnd()));
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ private void setBundleWFPath(Bundle... bundles) {
+ for (Bundle bundle : bundles) {
+ bundle.setProcessWorkflow(workflowPath);
+ }
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessFrequencyInEachColoWithOneProcessRunningDaily() throws Exception {
+ //set daily process
+ final String startTime = TimeUtil.getTimeWrtSystemTime(-20);
+ String endTime = TimeUtil.getTimeWrtSystemTime(4000);
+ bundles[1].setProcessPeriodicity(1, TimeUnit.days);
+ bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes =
+ OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+
+ LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+ String updatedProcess = InstanceUtil
+ .setProcessFrequency(bundles[1].getProcessData(),
+ new Frequency("" + 5, TimeUnit.minutes));
+
+ LOGGER.info("updated process: " + updatedProcess);
+
+ //now to update
+ ServiceResponse response =
+ prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ AssertUtil.assertSucceeded(response);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
+
+ String prismString = dualComparison(prism, cluster2, bundles[1].getProcessData());
+ Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+ new Frequency("" + 5, TimeUnit.minutes));
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated
+ // correctly.
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void
+ updateProcessFrequencyInEachColoWithOneProcessRunningDailyToMonthlyWithStartChange()
+ throws Exception {
+ //set daily process
+ final String startTime = TimeUtil.getTimeWrtSystemTime(-20);
+ String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60);
+ bundles[1].setProcessPeriodicity(1, TimeUnit.days);
+ bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days);
+ bundles[1].setProcessValidity(startTime, endTime);
+
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
+
+ String updatedProcess = InstanceUtil
+ .setProcessFrequency(bundles[1].getProcessData(),
+ new Frequency("" + 1, TimeUnit.months));
+ updatedProcess = InstanceUtil
+ .setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10),
+ endTime);
+
+ LOGGER.info("updated process: " + updatedProcess);
+
+ //now to update
+ ServiceResponse response =
+ prism.getProcessHelper().update(updatedProcess, updatedProcess);
+ AssertUtil.assertSucceeded(response);
+ String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData());
+ Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
+ new Frequency("" + 1, TimeUnit.months));
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessRollStartTimeBackwardsToPastInEachColoWithOneProcessRunning()
+ throws Exception {
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ TimeUtil.sleepSeconds(30);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
+ EntityType.PROCESS);
+
+ String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()
+ ), -3);
+ bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getEnd()
+ ));
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, true);
+ bundles[1].verifyDependencyListing(cluster2);
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessRollStartTimeForwardInEachColoWithOneProcessSuspended()
+ throws Exception {
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData())
+ );
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ TimeUtil.sleepSeconds(30);
+
+ OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
+ String oldStartTime = TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()
+ );
+ String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()
+ ), 3);
+ bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getEnd()
+ ));
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData())
+ );
+
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+
+ dualComparison(prism, cluster2, bundles[1].getProcessData());
+
+ bundles[1].verifyDependencyListing(cluster2);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ //ensure that the running process has new coordinators created; while the submitted
+ // one is updated correctly.
+ int finalNumberOfInstances =
+ InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
+ Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
+ Assert.assertEquals(finalNumberOfInstances,
+ getExpectedNumberOfWorkflowInstances(oldStartTime,
+ bundles[1].getProcessObject().getClusters().getClusters().get(0)
+ .getValidity().getEnd()));
+ Assert.assertEquals(InstanceUtil
+ .getProcessInstanceList(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS)
+ .size(), getExpectedNumberOfWorkflowInstances(newStartTime,
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
+
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(groups = { "multiCluster" }, timeOut = 1200000)
+ public void updateProcessRollStartTimeBackwardsInEachColoWithOneProcessSuspended()
+ throws Exception {
+ bundles[1].submitBundle(prism);
+ //now to schedule in 1 colo and let it remain in another
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ String oldBundleId = InstanceUtil
+ .getLatestBundleID(cluster3,
+ Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
+ TimeUtil.sleepSeconds(30);
+
+ String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getStart()
+ ), -3);
+ bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
+ bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
+ .getEnd()
+ ));
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
+
+ waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
+
+ AssertUtil.assertSucceeded(
+ cluster3.getProcessHelper()
+ .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper()
+ .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(cluster3.getProcessHelper()
+ .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+ List<String> oldNominalTimes =
+ OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
+
+ OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
+ bundles[1].getProcessData(), true, false);
+
+ bundles[1].verifyDependencyListing(cluster2);
+
+ dualComparison(prism, cluster3, bundles[1].getProcessData());
+ waitingForBundleFinish(cluster3, oldBundleId);
+
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ }
+
+ @Test(timeOut = 1200000)
+ public void
+ updateProcessWorkflowXml() throws URISyntaxException, JAXBException,
+ IOException, OozieClientException, AuthenticationException {
+ Bundle b = BundleUtil.readELBundle();
+ HadoopFileEditor hadoopFileEditor = null;
+ try {
+
+ b = new Bundle(b, cluster1);
+ b.setProcessWorkflow(workflowPath);
+ b.generateUniqueBundle();
+
+ b.setProcessValidity(TimeUtil.getTimeWrtSystemTime(-10),
+ TimeUtil.getTimeWrtSystemTime(15));
+ b.submitFeedsScheduleProcess(prism);
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster1, b.getProcessData(), 0, 10);
+ OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
+ b.getProcessName(), 0);
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(0),
+ Util.readEntityName(b.getProcessData()), 0, CoordinatorAction.Status.RUNNING,
+ EntityType.PROCESS);
+
+ //save old data
+ String oldBundleID = InstanceUtil
+ .getLatestBundleID(cluster1,
+ Util.readEntityName(b.getProcessData()), EntityType.PROCESS);
+
+ List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1,
+ oldBundleID,
+ EntityType.PROCESS);
+
+ //update workflow.xml
+ hadoopFileEditor = new HadoopFileEditor(cluster1FS);
+ hadoopFileEditor.edit(new ProcessMerlin(b
+ .getProcessData()).getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
+ "<!-- some comment -->");
+
+ //update
+ prism.getProcessHelper().update(b.getProcessData(),
+ b.getProcessData());
+
+ TimeUtil.sleepSeconds(20);
+ //verify new bundle creation
+ OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
+ b.getProcessData(), true, true);
+
+ } finally {
+ b.deleteBundle(prism);
+ if (hadoopFileEditor != null) {
+ hadoopFileEditor.restore();
+ }
+ }
+
+ }
+
+ public ServiceResponse updateProcessConcurrency(Bundle bundle, int concurrency)
+ throws Exception {
+ String oldData = bundle.getProcessData();
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject());
+ updatedProcess.setParallel(concurrency);
+
+ return prism.getProcessHelper()
+ .update(oldData, updatedProcess.toString());
+ }
+
+ /**
+ * this method compares process xml definition from 2 falcon servers / prism and expects them to
+ * be identical. If the definitions are identical then the definition from @param coloHelper1
+ * is @return are response.
+ */
+ private String dualComparison(ColoHelper coloHelper1, ColoHelper coloHelper2,
+ String processData) throws Exception {
+ String colo1Response = getResponse(coloHelper1, processData, true);
+ String colo2Response = getResponse(coloHelper2, processData, true);
+ Assert.assertTrue(XmlUtil.isIdentical(colo1Response, colo2Response),
+ "Process definition should have been identical");
+ return getResponse(coloHelper1, processData, true);
+ }
+
+ /**
+ * this method compares process xml definition from 2 falcon servers / prism and expects them to
+ * be different.
+ */
+ private void dualComparisonFailure(ColoHelper coloHelper1, ColoHelper coloHelper2,
+ String processData) throws Exception {
+ Assert.assertFalse(XmlUtil.isIdentical(getResponse(coloHelper1, processData, true),
+ getResponse(coloHelper2, processData, true)), "Process definition should not have been "
+ + "identical");
+ }
+
+ private String getResponse(ColoHelper prism, String processData, boolean bool)
+ throws Exception {
+ ServiceResponse response = prism.getProcessHelper()
+ .getEntityDefinition(Util.URLS.GET_ENTITY_DEFINITION, processData);
+ if (bool) {
+ AssertUtil.assertSucceeded(response);
+ } else {
+ AssertUtil.assertFailed(response);
+ }
+ String result = response.getMessage();
+ Assert.assertNotNull(result);
+
+ return result;
+
+ }
+
+ private void waitForProcessToReachACertainState(ColoHelper coloHelper, Bundle bundle,
+ Job.Status state)
+ throws Exception {
+
+ while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
+ Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) {
+ //keep waiting
+ TimeUtil.sleepSeconds(10);
+ }
+
+ //now check if the coordinator is in desired state
+ CoordinatorJob coord = getDefaultOozieCoord(coloHelper, InstanceUtil
+ .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
+ EntityType.PROCESS));
+
+ while (coord.getStatus() != state) {
+ TimeUtil.sleepSeconds(10);
+ coord = getDefaultOozieCoord(coloHelper, InstanceUtil
+ .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
+ EntityType.PROCESS));
+ }
+ }
+
+ private Bundle usualGrind(ColoHelper prism, Bundle b) throws Exception {
+ b.setInputFeedDataPath(inputFeedPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
+ HadoopUtil.lateDataReplenish(cluster1FS, 60, 1, prefix, null);
+ final String starTime = TimeUtil.getTimeWrtSystemTime(3);
+ String endTime = TimeUtil.getTimeWrtSystemTime(7);
+ b.setProcessPeriodicity(1, TimeUnit.minutes);
+ b.setOutputFeedPeriodicity(1, TimeUnit.minutes);
+ b.setProcessValidity(starTime, endTime);
+ return b;
+ }
+
+ private ExecutionType getRandomExecutionType(Bundle bundle) throws Exception {
+ ExecutionType current = bundle.getProcessObject().getOrder();
+ Random r = new Random();
+ ExecutionType[] values = ExecutionType.values();
+ int i;
+ do {
+
+ i = r.nextInt(values.length);
+ } while (current == values[i]);
+ return values[i];
+ }
+
+ public ServiceResponse updateProcessFrequency(Bundle bundle,
+ org.apache.falcon.entity.v0.Frequency frequency)
+ throws Exception {
+ String oldData = bundle.getProcessData();
+ ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject());
+ updatedProcess.setFrequency(frequency);
+ return prism.getProcessHelper()
+ .update(oldData, updatedProcess.toString());
+ }
+
+ //need to expand this function more later
+ private int getExpectedNumberOfWorkflowInstances(String start, String end) {
+ DateTime startDate = new DateTime(start);
+ DateTime endDate = new DateTime(end);
+ Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
+ return minutes.getMinutes();
+ }
+
+ private int getExpectedNumberOfWorkflowInstances(Date start, Date end) {
+ DateTime startDate = new DateTime(start);
+ DateTime endDate = new DateTime(end);
+ Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
+ return minutes.getMinutes();
+ }
+
+ private int getExpectedNumberOfWorkflowInstances(String start, Date end) {
+ DateTime startDate = new DateTime(start);
+ DateTime endDate = new DateTime(end);
+ Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
+ return minutes.getMinutes();
+ }
+
+ private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId, int minutes)
+ throws Exception {
+ int wait = 0;
+ while (!OozieUtil.isBundleOver(coloHelper, bundleId)) {
+ //keep waiting
+ LOGGER.info("bundle not over .. waiting");
+ TimeUtil.sleepSeconds(60);
+ wait++;
+ if (wait == minutes) {
+ Assert.assertTrue(false);
+ break;
+ }
+ }
+ }
+
+ private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId) throws Exception {
+ waitingForBundleFinish(coloHelper, bundleId, 15);
+ }
+
+ private CoordinatorJob getDefaultOozieCoord(ColoHelper coloHelper, String bundleId)
+ throws Exception {
+ OozieClient client = coloHelper.getFeedHelper().getOozieClient();
+ BundleJob bundlejob = client.getBundleJobInfo(bundleId);
+
+ for (CoordinatorJob coord : bundlejob.getCoordinators()) {
+ if (coord.getAppName().contains("DEFAULT")) {
+ return client.getCoordJobInfo(coord.getId());
+ }
+ }
+ return null;
+ }
+
+}