You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:25:50 UTC
[04/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
deleted file mode 100644
index 03bc358..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ /dev/null
@@ -1,1182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.process.PolicyType;
-import org.apache.falcon.entity.v0.process.Retry;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.MatrixUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests with Retries.
- */
-@Test(groups = "embedded")
-public class NewRetryTest extends BaseTestClass {
-
- private static final Logger LOGGER = Logger.getLogger(NewRetryTest.class);
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private OozieClient clusterOC = serverOC.get(0);
-
- private DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm");
- private final String baseTestDir = cleanAndGetTestDir();
- private final String aggregateWorkflowDir = baseTestDir + "/aggregator";
- private final String lateDir = baseTestDir + "/lateDataTest/testFolders";
- private final String latePath = lateDir + MINUTE_DATE_PATTERN;
- private DateTime startDate;
- private DateTime endDate;
-
- @BeforeClass(alwaysRun = true)
- public void uploadWorkflow() throws Exception {
- HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- bundles[0] = new Bundle(BundleUtil.readRetryBundle(), cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- startDate = new DateTime(DateTimeZone.UTC).plusMinutes(1);
- endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2);
- bundles[0].setProcessValidity(TimeUtil.dateToOozieDate(startDate.toDate()),
- TimeUtil.dateToOozieDate(endDate.toDate()));
-
- FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- feed.setFeedPathValue(latePath).insertLateFeedValue(new Frequency("minutes(8)"));
- bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed.toString());
- bundles[0].setOutputFeedLocationData(baseTestDir + "/output" + MINUTE_DATE_PATTERN);
- bundles[0].submitClusters(prism);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = true)
- public void testRetryInProcessZeroAttemptUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- // lets create data now:
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-
- //schedule process
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
-
- int defaultRetries = bundles[0].getProcessObject().getRetry().getAttempts();
-
- retry.setAttempts((0));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
- prism.getProcessHelper()
- .update((bundles[0].getProcessData()), bundles[0].getProcessData());
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId, defaultRetries);
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = true)
- public void testRetryInProcessLowerAttemptUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- //now wait till the process is over
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- for (int attempt = 0;
- attempt < 20 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
- "Failure Retry validation failed");
-
-
- retry.setAttempts((retry.getAttempts() - 2));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
-
- if ((retry.getAttempts() - 2) > 0) {
- Assert.assertTrue(prism.getProcessHelper()
- .update((bundles[0].getProcessData()), bundles[0].getProcessData())
- .getMessage().contains("updated successfully"),
- "process was not updated successfully");
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId, retry.getAttempts() - 2);
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInProcessLowerManageableAttemptUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
-
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- for (int i = 0; i < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++i) {
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
- "Failure Retry validation failed");
-
- retry.setAttempts((retry.getAttempts() - 1));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
- Assert.assertTrue(prism.getProcessHelper()
- .update((bundles[0].getProcessData()), bundles[0].getProcessData())
- .getMessage().contains("updated successfully"),
- "process was not updated successfully");
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId, retry.getAttempts() - 1);
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInProcessLowerBoundaryAttemptUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- for (int attempt = 0;
- attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 2); ++attempt) {
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 2),
- "Failure Retry validation failed");
-
-
- retry.setAttempts((2));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
- Assert.assertTrue(
- prism.getProcessHelper()
- .update((bundles[0].getProcessData()), bundles[0].getProcessData())
- .getMessage().contains("updated successfully"),
- "process was not updated successfully");
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId, 2);
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInProcessUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
- retry.setAttempts((4));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
- Assert.assertTrue(prism.getProcessHelper()
- .update(bundles[0].getProcessName(),
- null).getMessage()
- .contains("updated successfully"), "process was not updated successfully");
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId, 4);
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInProcessHigherDelayUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
- retry.setDelay(new Frequency("minutes(" + (retry.getDelay().getFrequency() + 1) + ")"));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
- Assert.assertTrue(
- prism.getProcessHelper().update(bundles[0].getProcessName(),
- bundles[0].getProcessData()).getMessage()
- .contains("updated successfully"), "process was not updated successfully");
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInProcessLowerDelayUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
- retry.setDelay(new Frequency(
- "minutes(" + (Integer.parseInt(retry.getDelay().getFrequency()) - 1) + ")"));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
- Assert.assertTrue(prism.getProcessHelper()
- .update(bundles[0].getProcessName(),
- bundles[0].getProcessData()).getMessage()
- .contains("updated successfully"),
- "process was not updated successfully");
- String newBundleId = OozieUtil
- .getLatestBundleID(clusterOC, bundles[0].getProcessName(),
- EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInProcessZeroDelayUpdate(Retry retry) throws Exception {
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- bundles[0].setRetry(retry);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25);
-
- retry.setDelay(new Frequency("minutes(0)"));
-
- bundles[0].setRetry(retry);
-
- LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC));
- Assert.assertFalse(
- prism.getProcessHelper().update(bundles[0].getProcessName()
- , bundles[0].getProcessData()).getMessage().contains("updated successfully"),
- "process was updated successfully!!!");
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS);
-
- Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!");
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInSimpleFailureCase(Retry retry) throws Exception {
-
- bundles[0].setRetry(retry);
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- bundles[0].setProcessLatePolicy(null);
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testUserRetryWhileAutomaticRetriesHappen(Retry retry) throws Exception {
-
- DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm");
-
- bundles[0].setRetry(retry);
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- LOGGER.info("process dates: " + startDate + "," + endDate);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
-
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- for (int attempt = 0;
- attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
- "Failure Retry validation failed");
-
- //now start firing random retries
- LOGGER.info("now firing user reruns:");
- for (int i = 0; i < 1; i++) {
- prism.getProcessHelper()
- .getProcessInstanceRerun(bundles[0].getProcessName(),
- "?start=" + timeFormatter.print(startDate).replace("/", "T") + "Z"
- + "&end=" + timeFormatter.print(endDate).replace("/", "T") + "Z");
- }
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testUserRetryAfterAutomaticRetriesHappen(Retry retry) throws Exception {
-
- DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm");
-
- bundles[0].setRetry(retry);
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- LOGGER.info("process dates: " + startDate + "," + endDate);
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(),
- EntityType.PROCESS).get(0);
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
-
- LOGGER.info("now firing user reruns:");
-
- DateTime[] dateBoundaries = getFailureTimeBoundaries(clusterOC, bundleId);
- InstancesResult piResult = prism.getProcessHelper()
- .getProcessInstanceRerun(bundles[0].getProcessName(),
- "?start=" + timeFormatter.print(dateBoundaries[0]).replace("/", "T") + "Z"
- + "&end=" + timeFormatter.print(dateBoundaries[dateBoundaries.length - 1])
- .replace("/", "T") + "Z");
-
- AssertUtil.assertSucceeded(piResult);
-
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts() + 1);
-
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInSuspendedAndResumeCaseWithLateData(Retry retry) throws Exception {
-
- FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- feed.setFeedPathValue(latePath);
- feed.insertLateFeedValue(new Frequency("minutes(10)"));
- bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed.toString());
- bundles[0].setRetry(retry);
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
- List<DateTime> dates = null;
-
- for (int i = 0; i < 10 && dates == null; ++i) {
- dates = OozieUtil.getStartTimeForRunningCoordinators(cluster, bundleId);
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertNotNull(dates, String
- .format("Start time for running coordinators of bundle: %s should not be null.",
- bundleId));
- LOGGER.info("Start time: " + formatter.print(startDate));
- LOGGER.info("End time: " + formatter.print(endDate));
- LOGGER.info("candidate nominal time:" + formatter.print(dates.get(0)));
-
- for (int attempt = 0;
- attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) {
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1),
- "Failure Retry validation failed");
-
- LOGGER.info("now suspending the process altogether....");
-
- AssertUtil.assertSucceeded(
- cluster.getProcessHelper().suspend(bundles[0].getProcessData()));
-
- HashMap<String, Integer> initialMap = getFailureRetriesForEachWorkflow(
- clusterOC, getDefaultOozieCoordinator(clusterOC, bundleId));
- LOGGER.info("saved state of workflow retries");
-
- for (String key : initialMap.keySet()) {
- LOGGER.info(key + "," + initialMap.get(key));
- }
-
- TimeUnit.MINUTES.sleep(10);
-
-
- HashMap<String, Integer> finalMap = getFailureRetriesForEachWorkflow(
- clusterOC, getDefaultOozieCoordinator(clusterOC, bundleId));
- LOGGER.info("final state of process looks like:");
-
- for (String key : finalMap.keySet()) {
- LOGGER.info(key + "," + finalMap.get(key));
- }
-
- Assert.assertEquals(initialMap.size(), finalMap.size(),
- "a new workflow retried while process was suspended!!!!");
-
- for (String key : initialMap.keySet()) {
- Assert.assertEquals(initialMap.get(key), finalMap.get(key),
- "values are different for workflow: " + key);
- }
-
- LOGGER.info("now resuming the process...");
- AssertUtil.assertSucceeded(
- cluster.getProcessHelper().resume(bundles[0].getProcessData()));
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInLateDataCase(Retry retry) throws Exception {
-
- FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- feed.setFeedPathValue(latePath);
-
- feed.insertLateFeedValue(getFrequency(retry));
-
- bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed.toString());
-
- bundles[0].setRetry(retry);
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
-
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- List<String> initialData =
- Util.getHadoopDataFromDir(clusterFS, bundles[0].getInputFeedFromBundle(),
- lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
- List<DateTime> dates = null;
-
- for (int i = 0; i < 10 && dates == null; ++i) {
- dates = OozieUtil.getStartTimeForRunningCoordinators(cluster, bundleId);
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertNotNull(dates, String
- .format("Start time for running coordinators of bundle: %s should not be null.",
- bundleId));
-
- LOGGER.info("Start time: " + formatter.print(startDate));
- LOGGER.info("End time: " + formatter.print(endDate));
- LOGGER.info("candidate nominal time:" + formatter.print(dates.get(0)));
- DateTime now = dates.get(0);
-
- if (formatter.print(startDate).compareToIgnoreCase(formatter.print(dates.get(0))) > 0) {
- now = startDate;
- }
-
- //now wait till the process is over
- for (int attempt = 0; attempt < 10 && !validateFailureRetries(
- clusterOC, bundleId, bundles[0].getProcessObject().getRetry().getAttempts());
- ++attempt) {
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertTrue(
- validateFailureRetries(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts()),
- "Failure Retry validation failed");
-
- String insertionFolder =
- Util.findFolderBetweenGivenTimeStamps(now, now.plusMinutes(5), initialData);
- LOGGER.info("inserting data in folder " + insertionFolder + " at " + DateTime.now());
- HadoopUtil.injectMoreData(clusterFS, lateDir + insertionFolder,
- OSUtil.concat(OSUtil.OOZIE_EXAMPLE_INPUT_DATA, "lateData"));
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- bundles[0].getProcessObject().getRetry().getAttempts());
-
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
- public void testRetryInDeleteAfterPartialRetryCase(Retry retry) throws Exception {
-
- FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- feed.setFeedPathValue(latePath);
- feed.insertLateFeedValue(new Frequency("minutes(1)"));
- bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed.toString());
-
- bundles[0].setRetry(retry);
-
- for (String data : bundles[0].getDataSets()) {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data));
- }
-
-
- //submit and schedule process
- ServiceResponse response =
- prism.getProcessHelper().submitEntity(bundles[0].getProcessData());
- if (retry.getAttempts() <= 0 || retry.getDelay().getFrequencyAsInt() <= 0) {
- AssertUtil.assertFailed(response);
- } else {
- AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(bundles[0].getProcessData()));
- //now wait till the process is over
- String bundleId = OozieUtil.getBundles(clusterOC,
- bundles[0].getProcessName(), EntityType.PROCESS).get(0);
-
- validateRetry(clusterOC, bundleId,
- (bundles[0].getProcessObject().getRetry().getAttempts()) / 2);
-
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().delete((bundles[0].getProcessData())));
-
- if (retry.getPolicy() == PolicyType.EXP_BACKOFF) {
- TimeUnit.MINUTES.sleep(retry.getDelay().getFrequencyAsInt() * ((retry.getAttempts()
- - (bundles[0].getProcessObject().getRetry().getAttempts()) / 2) ^ 2));
- } else {
- TimeUnit.MINUTES
- .sleep(retry.getDelay().getFrequencyAsInt()
- * ((bundles[0].getProcessObject().getRetry().getAttempts())
- - (bundles[0].getProcessObject().getRetry().getAttempts()) / 2));
- }
-
- //now to validate all failed instances to check if they were retried or not.
- validateRetry(clusterOC, bundleId,
- (bundles[0].getProcessObject().getRetry().getAttempts()) / 2);
-
- if (bundles[0].getProcessObject().getRetry().getAttempts() > 0) {
- checkIfRetriesWereTriggeredCorrectly(cluster, retry, bundleId);
- }
- }
- }
-
-
- private void validateRetry(OozieClient oozieClient, String bundleId, int maxNumberOfRetries)
- throws Exception {
- //validate that all failed processes were retried the specified number of times.
- for (int i = 0; i < 60 && getDefaultOozieCoordinator(oozieClient, bundleId) == null; ++i) {
- TimeUtil.sleepSeconds(10);
- }
- final CoordinatorJob defaultCoordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
- Assert.assertNotNull(defaultCoordinator, "Unexpected value of defaultCoordinator");
-
- for (int i = 0;
- i < 60 && !validateFailureRetries(oozieClient, bundleId, maxNumberOfRetries); ++i) {
- LOGGER.info("desired state not reached, attempt number: " + i);
- TimeUtil.sleepSeconds(10);
- }
- Assert.assertTrue(validateFailureRetries(oozieClient, bundleId, maxNumberOfRetries),
- "all retries were not attempted correctly!");
- }
-
-
- private boolean validateFailureRetries(OozieClient oozieClient, String bundleId,
- int maxNumberOfRetries) throws Exception {
- final CoordinatorJob coordinator = getDefaultOozieCoordinator(clusterOC, bundleId);
- if (maxNumberOfRetries < 0) {
- maxNumberOfRetries = 0;
- }
- LOGGER.info("coordinator: " + coordinator);
- HashMap<String, Boolean> workflowMap = new HashMap<>();
-
- if (coordinator == null || coordinator.getActions().size() == 0) {
- return false;
- }
- LOGGER.info("coordinator.getActions(): " + coordinator.getActions());
- for (CoordinatorAction action : coordinator.getActions()) {
-
- if (null == action.getExternalId()) {
- return false;
- }
-
-
- WorkflowJob actionInfo = oozieClient.getJobInfo(action.getExternalId());
- LOGGER
- .info("actionInfo: " + actionInfo + " actionInfo.getRun(): " + actionInfo.getRun());
-
-
- if (!(actionInfo.getStatus() == WorkflowJob.Status.SUCCEEDED
- || actionInfo.getStatus() == WorkflowJob.Status.RUNNING)) {
- if (actionInfo.getRun() == maxNumberOfRetries) {
- workflowMap.put(actionInfo.getId(), true);
- } else {
- Assert.assertTrue(actionInfo.getRun() < maxNumberOfRetries,
- "The workflow exceeded the max number of retries specified for it!!!!");
- workflowMap.put(actionInfo.getId(), false);
- }
-
- } else if (actionInfo.getStatus() == WorkflowJob.Status.SUCCEEDED) {
- workflowMap.put(actionInfo.getId(), true);
- }
- }
-
- //first make sure that the map has all the entries for the coordinator:
- if (workflowMap.size() != coordinator.getActions().size()) {
- return false;
- } else {
- boolean result = true;
-
- for (String key : workflowMap.keySet()) {
- result &= workflowMap.get(key);
- }
-
- return result;
- }
- }
-
- public CoordinatorJob getDefaultOozieCoordinator(OozieClient oozieClient, String bundleId)
- throws Exception {
- BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId);
-
- for (CoordinatorJob coord : bundlejob.getCoordinators()) {
- if (coord.getAppName().contains("DEFAULT")) {
- return oozieClient.getCoordJobInfo(coord.getId());
- }
- }
- return null;
- }
-
- @DataProvider(name = "DP")
- public Object[][] getData() {
-
- String[] retryTypes = new String[]{"periodic", "exp-backoff"}; //,"exp-backoff"
- Integer[] delays = new Integer[]{2, 0}; //removing -1 since this should be checked at
- // validation level while setting
- String[] delayUnits = new String[]{"minutes"};
- Integer[] retryAttempts = new Integer[]{2, 0, 3}; //0,-1,2
-
- Object[][] crossProd = MatrixUtil
- .crossProduct(delays, delayUnits, retryTypes, retryAttempts);
- Object[][] testData = new Object[crossProd.length][1];
- for (int i = 0; i < crossProd.length; ++i) {
- final Integer delay = (Integer) crossProd[i][0];
- final String delayUnit = (String) crossProd[i][1];
- final String retryType = (String) crossProd[i][2];
- final Integer retry = (Integer) crossProd[i][3];
- testData[i][0] = getRetry(delay, delayUnit, retryType, retry);
- }
- return testData;
- }
-
- private void waitTillCertainPercentageOfProcessHasStarted(OozieClient oozieClient,
- String bundleId, int percentage)
- throws Exception {
- OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId);
- CoordinatorJob defaultCoordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-
- // make sure default coordinator is not null before we proceed
- for (int i = 0; i < 120 && (defaultCoordinator == null || defaultCoordinator.getStatus()
- == CoordinatorJob.Status.PREP); ++i) {
- TimeUtil.sleepSeconds(10);
- defaultCoordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
- }
- Assert.assertNotNull(defaultCoordinator, "default coordinator is null");
- Assert.assertNotEquals(defaultCoordinator.getStatus(), CoordinatorJob.Status.PREP,
- "Unexpected state for coordinator job: " + defaultCoordinator.getId());
- int totalCount = defaultCoordinator.getActions().size();
-
- int percentageConversion = (percentage * totalCount) / 100;
-
- while (percentageConversion > 0) {
- int doneBynow = 0;
- for (CoordinatorAction action : defaultCoordinator.getActions()) {
- CoordinatorAction actionInfo = oozieClient.getCoordActionInfo(action.getId());
- if (actionInfo.getStatus() == CoordinatorAction.Status.RUNNING) {
- doneBynow++;
- }
- }
- if (doneBynow >= percentageConversion) {
- break;
- }
- TimeUtil.sleepSeconds(10);
- }
- }
-
-
- private HashMap<String, Integer> getFailureRetriesForEachWorkflow(OozieClient oozieClient,
- CoordinatorJob coordinator)
- throws Exception {
- HashMap<String, Integer> workflowRetryMap = new HashMap<>();
- for (CoordinatorAction action : coordinator.getActions()) {
-
- if (null == action.getExternalId()) {
- continue;
- }
-
- WorkflowJob actionInfo = oozieClient.getJobInfo(action.getExternalId());
- LOGGER.info("adding workflow " + actionInfo.getId() + " to the map");
- workflowRetryMap.put(actionInfo.getId(), actionInfo.getRun());
- }
- return workflowRetryMap;
- }
-
- private DateTime[] getFailureTimeBoundaries(OozieClient oozieClient, String bundleId)
- throws Exception {
- List<DateTime> dateList = new ArrayList<>();
-
- CoordinatorJob coordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-
- for (CoordinatorAction action : coordinator.getActions()) {
- if (action.getExternalId() != null) {
-
- WorkflowJob jobInfo = oozieClient.getJobInfo(action.getExternalId());
- if (jobInfo.getRun() > 0) {
- dateList.add(new DateTime(jobInfo.getStartTime(), DateTimeZone.UTC));
- }
- }
- }
- Collections.sort(dateList);
- return dateList.toArray(new DateTime[dateList.size()]);
- }
-
- private void checkIfRetriesWereTriggeredCorrectly(ColoHelper coloHelper, Retry retry,
- String bundleId) throws Exception {
- //it is presumed that this delay here will be expressed in minutes. Hourly/daily is
- // unfeasible to check :)
-
- final DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("HH:mm:ss");
-
- final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient();
- final CoordinatorJob coordinator = getDefaultOozieCoordinator(oozieClient, bundleId);
-
- for (CoordinatorAction action : coordinator.getActions()) {
- CoordinatorAction coordAction = oozieClient.getCoordActionInfo(action.getExternalId());
- if (!(coordAction.getStatus() == CoordinatorAction.Status.SUCCEEDED)) {
- int expectedDelay = retry.getDelay().getFrequencyAsInt();
- //first get data from logs:
- List<String> instanceRetryTimes =
- Util.getInstanceRetryTimes(coloHelper, action.getExternalId());
- List<String> instanceFinishTimes =
- Util.getInstanceFinishTimes(coloHelper, action.getExternalId());
-
- LOGGER.info("finish times look like:");
- for (String line : instanceFinishTimes) {
- LOGGER.info(line);
- }
-
- LOGGER.info("retry times look like:");
- for (String line : instanceRetryTimes) {
- LOGGER.info(line);
- }
-
- LOGGER.info("checking timelines for retry type " + retry.getPolicy().value()
- + " for delay " + expectedDelay + " for workflow id: " + action.getExternalId());
-
- if (retry.getPolicy() == PolicyType.PERIODIC) {
- //in this case the delay unit will always be a constant time diff
- for (int i = 0; i < instanceFinishTimes.size() - 1; i++) {
- DateTime temp = timeFormatter.parseDateTime(instanceFinishTimes.get(i));
-
- Assert.assertEquals(temp.plusMinutes(expectedDelay).getMillis(),
- timeFormatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(),
- 5000, "oops! this is out of expected delay range for workflow id "
- + action.getExternalId());
- }
- } else {
- //check for exponential
- for (int i = 0; i < instanceFinishTimes.size() - 1; i++) {
- DateTime temp = timeFormatter.parseDateTime(instanceFinishTimes.get(i));
- Assert.assertEquals(temp.plusMinutes(expectedDelay).getMillis(),
- timeFormatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(),
- 5000, "oops! this is out of expected delay range for workflow id "
- + action.getExternalId());
- expectedDelay *= 2;
- }
- }
- }
- }
-
- }
-
- private Retry getRetry(int delay, String delayUnits, String retryType,
- int retryAttempts) {
- Retry retry = new Retry() {
- @Override
- public String toString(){
- return String.format("Frequency: %s; Attempts: %s; PolicyType: %s",
- this.getDelay(), this.getAttempts(), this.getPolicy());
- }
- };
- retry.setAttempts(retryAttempts);
- retry.setDelay(new Frequency(delayUnits + "(" + delay + ")"));
- retry.setPolicy(PolicyType.fromValue(retryType));
- return retry;
- }
-
- private Frequency getFrequency(Retry retry) {
- int delay = retry.getDelay().getFrequencyAsInt();
- if (delay == 0) {
- delay = 1;
- }
- int attempts = retry.getAttempts();
- if (attempts == 0) {
- attempts = 1;
- }
-
- if (retry.getPolicy() == PolicyType.EXP_BACKOFF) {
- delay = (Math.abs(delay)) * (2 ^ (Math.abs(attempts)));
- } else {
- delay = Math.abs(delay * attempts);
- }
-
- return new Frequency(retry.getDelay().getTimeUnit() + "(" + delay + ")");
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
deleted file mode 100644
index 0711e8a..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.supportClasses.JmsMessageConsumer;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-
-/**
- * Null output process tests.
- */
-@Test(groups = "embedded")
-public class NoOutputProcessTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private String testDir = cleanAndGetTestDir();
- private String inputPath = testDir + "/input" + MINUTE_DATE_PATTERN;
- private String workflowForNoIpOp = cleanAndGetTestDir();
- private static final Logger LOGGER = Logger.getLogger(NoOutputProcessTest.class);
-
- @BeforeClass(alwaysRun = true)
- public void createTestData() throws Exception {
- LOGGER.info("in @BeforeClass");
- uploadDirToClusters(workflowForNoIpOp, OSUtil.concat(OSUtil.RESOURCES, "workflows", "aggregatorNoOutput"));
- Bundle b = BundleUtil.readELBundle();
- b.generateUniqueBundle(this);
- b = new Bundle(b, cluster);
- String startDate = "2010-01-03T00:00Z";
- String endDate = "2010-01-03T03:00Z";
- b.setInputFeedDataPath(inputPath);
- String prefix = b.getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
- List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0].generateUniqueBundle(this);
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].setProcessWorkflow(workflowForNoIpOp);
- bundles[0].setInputFeedDataPath(inputPath);
- bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- ProcessMerlin process = bundles[0].getProcessObject();
- process.setOutputs(null);
- process.setLateProcess(null);
- bundles[0].submitFeedsScheduleProcess(prism);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Waits till process ends successfully. Check that JMS messages related to entities
- * reflects info about succeeded process instances as expected.
- * @throws Exception
- */
- @Test(enabled = true, groups = {"singleCluster"})
- public void checkForJMSMsgWhenNoOutput() throws Exception {
- LOGGER.info("attaching messageConsumer to: " + "FALCON.ENTITY.TOPIC");
- JmsMessageConsumer messageConsumer =
- new JmsMessageConsumer("FALCON.ENTITY.TOPIC", cluster.getClusterHelper().getActiveMQ());
- messageConsumer.start();
-
- //wait for all the instances to complete
- InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- messageConsumer.interrupt();
- Util.printMessageData(messageConsumer);
- Assert.assertEquals(messageConsumer.getReceivedMessages().size(), 3,
- " Message for all the 3 instance not found");
- }
-
- /**
- * Waits till process ends successfully. Checks that JMS messages related to entities
- * and to particular process reflects info about succeeded process instances as expected.
- * @throws Exception
- */
- @Test(enabled = true, groups = {"singleCluster"})
- public void rm() throws Exception {
- JmsMessageConsumer consumerEntityMsg =
- new JmsMessageConsumer("FALCON.ENTITY.TOPIC", cluster.getClusterHelper().getActiveMQ());
- JmsMessageConsumer consumerProcessMsg =
- new JmsMessageConsumer("FALCON." + bundles[0].getProcessName(),
- cluster.getClusterHelper().getActiveMQ());
- consumerEntityMsg.start();
- consumerProcessMsg.start();
-
- //wait for all the instances to complete
- InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- consumerEntityMsg.interrupt();
- consumerProcessMsg.interrupt();
- Util.printMessageData(consumerEntityMsg);
- Util.printMessageData(consumerProcessMsg);
- Assert.assertEquals(consumerEntityMsg.getReceivedMessages().size(), 3,
- " Message for all the 3 instance not found");
- Assert.assertEquals(consumerProcessMsg.getReceivedMessages().size(), 3,
- " Message for all the 3 instance not found");
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
deleted file mode 100644
index b0480e9..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Test process with different frequency combinations.
- */
-@Test(groups = "embedded")
-public class ProcessFrequencyTest extends BaseTestClass {
- private static final Logger LOGGER = Logger.getLogger(ProcessFrequencyTest.class);
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private String baseTestHDFSDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
-
- @BeforeClass(alwaysRun = true)
- public void createTestData() throws Exception {
- LOGGER.info("in @BeforeClass");
- HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Test Process submission with different frequency. Expecting process workflow to run
- * successfully.
- * @throws Exception
- */
- @Test(dataProvider = "generateProcessFrequencies")
- public void testProcessWithFrequency(final FreqType freqType, final int freqAmount)
- throws Exception {
- final String startDate = "2010-01-02T01:00Z";
- final String endDate = "2010-01-02T01:01Z";
- final String inputPath = baseTestHDFSDir + "/input/";
- bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
- bundles[0].setOutputFeedLocationData(
- baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
- bundles[0].setProcessPeriodicity(freqAmount, freqType.getFalconTimeUnit());
- bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
- bundles[0].setProcessValidity(startDate, endDate);
- HadoopUtil.deleteDirIfExists(inputPath, clusterFS);
- bundles[0].submitFeedsScheduleProcess(prism);
-
- //upload data
- final String startPath = inputPath + freqType.getFormatter().print(
- TimeUtil.oozieDateToDate(startDate));
- HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT);
-
- final String processName = bundles[0].getProcessName();
- //InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);
- InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
- InstanceUtil.validateSuccessWOInstances(r);
- }
-
- @DataProvider(name = "generateProcessFrequencies")
- public Object[][] generateProcessFrequencies() {
- return new Object[][] {
- {FreqType.MINUTELY, 2, },
- {FreqType.HOURLY, 3, },
- {FreqType.DAILY, 5, },
- {FreqType.MONTHLY, 7, },
- };
- }
-
- /**
- * Test Process submission with bad frequency. Expecting submissions to fails.
- * @throws Exception
- */
- @Test
- public void testProcessWithBadFrequency()
- throws Exception {
- final String startDate = "2010-01-02T01:00Z";
- final String endDate = "2010-01-02T01:01Z";
- final String inputPath = baseTestHDFSDir + "/input/";
- final FreqType freqType = FreqType.MINUTELY;
- bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
- bundles[0].setOutputFeedLocationData(
- baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
- bundles[0].submitClusters(prism);
- bundles[0].submitFeeds(prism);
-
- bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
- bundles[0].setProcessValidity(startDate, endDate);
- final ProcessMerlin processMerlin = bundles[0].getProcessObject();
- //a frequency can be bad in two ways - it can have bad amount or it can have bad unit
- //submit process with bad amount
- processMerlin.setFrequency(new Frequency("BadAmount", freqType.getFalconTimeUnit()));
- AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(processMerlin.toString()));
-
- //submit process with bad unit
- processMerlin.setFrequency(new Frequency("2993", freqType.getFalconTimeUnit()));
- final String process = processMerlin.toString();
- final String newProcess = process.replaceAll("minutes\\(2993\\)", "BadUnit(2993)");
- AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(newProcess));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
deleted file mode 100644
index 91d39a7..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction.Status;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-/**
- * Process instance mixed colo tests.
- */
-@Test(groups = "embedded")
-public class ProcessInstanceColoMixedTest extends BaseTestClass {
-
- private final String baseTestHDFSDir = cleanAndGetTestDir();
- private final String feedPath = baseTestHDFSDir + "/feed0%d" + MINUTE_DATE_PATTERN;
- private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
- private ColoHelper cluster1 = servers.get(0);
- private ColoHelper cluster2 = servers.get(1);
- private FileSystem cluster1FS = serverFS.get(0);
- private FileSystem cluster2FS = serverFS.get(1);
- private static final Logger LOGGER = Logger.getLogger(ProcessInstanceColoMixedTest.class);
-
- @BeforeClass(alwaysRun = true)
- public void prepareClusters() throws Exception {
- LOGGER.info("in @BeforeClass");
- HadoopUtil.uploadDir(cluster1FS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- HadoopUtil.uploadDir(cluster2FS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- //generate bundles according to config files
- bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1);
- bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
- bundles[0].generateUniqueBundle(this);
- bundles[1].generateUniqueBundle(this);
-
- //set cluster colos
- bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
- LOGGER.info("cluster b1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
- bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
- LOGGER.info("cluster b2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
-
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[1].setProcessWorkflow(aggregateWorkflowDir);
- //submit 2 clusters
- Bundle.submitCluster(bundles[0], bundles[1]);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- @Test(timeOut = 12000000)
- public void mixed01C1sC2sC1eC2e() throws Exception {
- //ua1 and ua3 are source. ua2 target. feed01 on ua1 , feed02 on ua3
- //get 2 unique feeds
- FeedMerlin feed01 = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- FeedMerlin feed02 = new FeedMerlin(bundles[1].getInputFeedFromBundle());
- FeedMerlin outputFeed = new FeedMerlin(bundles[0].getOutputFeedFromBundle());
- //set source and target for the 2 feeds
-
- //set clusters to null;
- feed01.clearFeedClusters();
- feed02.clearFeedClusters();
- outputFeed.clearFeedClusters();
-
- //set new feed input data
- feed01.setFeedPathValue(String.format(feedPath, 1));
- feed02.setFeedPathValue(String.format(feedPath, 2));
-
- //generate data in both the colos ua1 and ua3
- List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
- TimeUtil.getTimeWrtSystemTime(-35), TimeUtil.getTimeWrtSystemTime(25), 1);
-
- String prefix = feed01.getFeedPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
- HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.SINGLE_FILE, prefix, dataDates);
-
- prefix = feed02.getFeedPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
- HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, prefix, dataDates);
-
- String startTime = TimeUtil.getTimeWrtSystemTime(-70);
-
- //set clusters for feed01
- feed01.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(10000)", ActionType.DELETE)
- .withValidity(startTime, "2099-01-01T00:00Z")
- .withClusterType(ClusterType.SOURCE)
- .build());
- feed01.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(10000)", ActionType.DELETE)
- .withValidity(startTime, "2099-01-01T00:00Z")
- .withClusterType(ClusterType.TARGET)
- .build());
-
- //set clusters for feed02
- feed02.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(10000)", ActionType.DELETE)
- .withValidity(startTime, "2099-01-01T00:00Z")
- .withClusterType(ClusterType.TARGET)
- .build());
- feed02.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(10000)", ActionType.DELETE)
- .withValidity(startTime, "2099-01-01T00:00Z")
- .withClusterType(ClusterType.SOURCE)
- .build());
-
- //set clusters for output feed
- outputFeed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(10000)", ActionType.DELETE)
- .withValidity(startTime, "2099-01-01T00:00Z")
- .withClusterType(ClusterType.SOURCE)
- .build());
- outputFeed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(10000)", ActionType.DELETE)
- .withValidity(startTime, "2099-01-01T00:00Z")
- .withClusterType(ClusterType.TARGET)
- .build());
-
- //submit and schedule feeds
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed02.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed.toString()));
-
- String processStartTime = TimeUtil.getTimeWrtSystemTime(-16);
- // String processEndTime = InstanceUtil.getTimeWrtSystemTime(20);
-
- ProcessMerlin process = bundles[0].getProcessObject();
- process.clearProcessCluster();
- process.addProcessCluster(
- new ProcessMerlin.ProcessClusterBuilder(
- Util.readEntityName(bundles[0].getClusters().get(0)))
- .withValidity(processStartTime, TimeUtil.addMinsToTime(processStartTime, 35))
- .build());
- process.addProcessCluster(
- new ProcessMerlin.ProcessClusterBuilder(
- Util.readEntityName(bundles[1].getClusters().get(0)))
- .withValidity(TimeUtil.addMinsToTime(processStartTime, 16),
- TimeUtil.addMinsToTime(processStartTime, 45))
- .build());
- process.addInputFeed(feed02.getName(), feed02.getName());
-
- //submit and schedule process
- prism.getProcessHelper().submitAndSchedule(process.toString());
-
- LOGGER.info("Wait till process goes into running ");
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), process.getName(), 1,
- Status.RUNNING, EntityType.PROCESS);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(1), process.getName(), 1,
- Status.RUNNING, EntityType.PROCESS);
-
- InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
- "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
- AssertUtil.assertSucceeded(responseInstance);
- Assert.assertTrue(responseInstance.getInstances() != null);
-
- responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(process.getName(),
- "?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
- + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
- AssertUtil.assertSucceeded(responseInstance);
- Assert.assertTrue(responseInstance.getInstances() != null);
-
- responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
- "?start=" + TimeUtil.addMinsToTime(processStartTime, 37)
- + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44));
- AssertUtil.assertSucceeded(responseInstance);
- Assert.assertTrue(responseInstance.getInstances() != null);
-
- responseInstance = prism.getProcessHelper().getProcessInstanceResume(process.getName(),
- "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
- AssertUtil.assertSucceeded(responseInstance);
- Assert.assertTrue(responseInstance.getInstances() != null);
-
- responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(),
- "?start=" + TimeUtil.addMinsToTime(processStartTime, 16)
- + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45));
- AssertUtil.assertSucceeded(responseInstance);
- Assert.assertTrue(responseInstance.getInstances() != null);
-
- responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(process.getName(),
- "?start=" + processStartTime + "&end="+ TimeUtil.addMinsToTime(processStartTime, 7));
- AssertUtil.assertSucceeded(responseInstance);
- Assert.assertTrue(responseInstance.getInstances() != null);
-
- responseInstance = prism.getProcessHelper().getProcessInstanceRerun(process.getName(),
- "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7));
- AssertUtil.assertSucceeded(responseInstance);
- Assert.assertTrue(responseInstance.getInstances() != null);
- }
-}
-