You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:25:48 UTC
[02/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
deleted file mode 100644
index 14ecfe4..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.APIResult;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction.Status;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Process instance status tests.
- */
-@Test(groups = "embedded")
-public class ProcessInstanceStatusTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private String baseTestHDFSDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
- private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN;
- private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN;
- private String feedInputTimedOutPath = baseTestHDFSDir + "/timedoutStatus"
- + MINUTE_DATE_PATTERN;
- private String feedOutputTimedOutPath =
- baseTestHDFSDir + "/output-data/timedoutStatus" + MINUTE_DATE_PATTERN;
- private static final Logger LOGGER = Logger.getLogger(ProcessInstanceStatusTest.class);
- private static final double TIMEOUT = 15;
- private String processName;
- private OozieClient clusterOC = serverOC.get(0);
-
- @BeforeClass(alwaysRun = true)
- public void createTestData() throws Exception {
- LOGGER.info("in @BeforeClass");
- HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- /**
- * Configures general process definition which particular properties can be overwritten.
- */
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setInputFeedDataPath(feedInputPath);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- processName = bundles[0].getProcessName();
- HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * time out is set as 3 minutes .... getStatus is for a large range in past.
- * 6 instance should be materialized and one in running and other in waiting
- * Adding logging information test as part of FALCON-813.
- * In case status does not contain jobId of instance the test should fail.
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusStartAndEndCheckNoInstanceAfterEndDate()
- throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z");
- bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
- InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
- InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0);
- List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
- Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message");
- }
-
- /**
- * Perform -getStatus using only -start parameter within time-range of non-materialized
- * instances. There should be no instances returned in response.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusOnlyStartAfterMat() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z");
- bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T05:00Z");
- AssertUtil.assertSucceeded(r);
- Assert.assertEquals(r.getInstances(), null);
- }
-
- /**
- * Schedule process. Perform -getStatus using -end parameter which is out of process
- * validity range. Attempt should succeed with end defaulted to entity end.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusEndOutOfRange() throws Exception {
- HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
- InstanceUtil.validateResponse(r, 5, 0, 0, 5, 0);
- }
-
- /**
- * Schedule process and try to -getStatus without date parameters. Attempt should succeed. Start defaults
- * to start of entity and end defaults to end of entity.
- * Adding logging information test as part of status information.
- * In case status does not contain jobId of instance the test should fail.
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusDateEmpty()
- throws JAXBException, AuthenticationException, IOException, URISyntaxException,
- OozieClientException, InterruptedException {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z");
- bundles[0].setProcessConcurrency(5);
- bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 5,
- Status.RUNNING, EntityType.PROCESS);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
- InstanceUtil.validateResponse(r, 6, 5, 0, 1, 0);
- List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
- Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message");
- }
-
- /**
- * Schedule process with number of instances. Perform -getStatus request with valid
- * parameters. Attempt should succeed.
- * Adding logging information test as part of status information.
- * In case status does not contain jobId of instance the test should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusStartAndEnd() throws Exception {
- bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1 ,
- Status.RUNNING, EntityType.PROCESS);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
- InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
- List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
- Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message");
- }
-
- /**
- * Schedule process. Perform -getStatus using -start parameter which is out of process
- * validity range. Attempt should succeed, with start defaulted to entity start time.
- * Adding logging information test as part of status information.
- * In case status does not contain jobId of instance the test should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusStartOutOfRange() throws Exception {
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(5);
- bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
- Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T00:00Z&end=2010-01-02T01:21Z");
- InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
- List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
- Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message");
- }
-
- /**
- * Schedule and then delete process. Try to get the status of its instances. Attempt should
- * fail with an appropriate code.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusKilled() throws Exception {
- bundles[0].submitFeedsScheduleProcess(prism);
- AssertUtil.assertSucceeded(prism.getProcessHelper().delete(bundles[0].getProcessData()));
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
- InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND);
- }
-
- /**
- * Schedule process and then suspend it. -getStatus of first instance only -start parameter.
- * Instance should be suspended.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusOnlyStartSuspended() throws Exception {
- bundles[0].submitFeedsScheduleProcess(prism);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS);
- AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(bundles[0].getProcessData()));
- TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z");
- Assert.assertEquals(r.getStatus(), APIResult.Status.SUCCEEDED);
- Assert.assertEquals(InstanceUtil.instancesInResultWithStatus(r, WorkflowStatus.SUSPENDED), 1);
- }
-
- /**
- * Schedule process. Try to -getStatus using -start/-end parameters with values which were
- * reversed i.e. -start is further than -end. Attempt should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusReverseDateRange() throws Exception {
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
- Status.RUNNING, EntityType.PROCESS);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:20Z&end=2010-01-02T01:07Z");
- InstanceUtil.validateError(r, ResponseErrors.START_BEFORE_SCHEDULED);
- }
-
- /**
- * Schedule process. Perform -getStatus using -start/-end parameters which are out of process
- * validity range. Attempt should succeed, with start/end defaulted to entity's start/end.
- * Adding logging information test as part of status information.
- * In case status does not contain jobId of instance the test should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusStartEndOutOfRange() throws Exception {
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
- bundles[0].setProcessConcurrency(2);
- bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
- Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
- InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0);
- List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
- Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message");
- }
-
- /**
- * Schedule process. Suspend and then resume it. -getStatus of its instances. Check that
- * response reflects that instances are running.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusResumed() throws Exception {
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
- bundles[0].setProcessConcurrency(5);
- bundles[0].submitFeedsScheduleProcess(prism);
- String process = bundles[0].getProcessData();
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
- Status.RUNNING, EntityType.PROCESS, 5);
- AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(process));
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, Status.SUSPENDED, EntityType.PROCESS, 3);
- TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.assertSucceeded(prism.getProcessHelper().resume(process));
- TimeUtil.sleepSeconds(TIMEOUT);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
- Status.RUNNING, EntityType.PROCESS, 5);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
- InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
- }
-
- /**
- * Schedule process. -getStatus of it's first instance using only -start parameter which
- * points to start time of process validity. Check that response reflects expected status of
- * instance.
- * Adding logging information test as part of status information.
- * In case status does not contain jobId of instance the test should fail.
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusOnlyStart() throws Exception {
- bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
- Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z");
- InstanceUtil.validateResponse(r, 5, 1, 0, 4, 0);
- List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
- Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message");
- }
-
- /**
- * Schedule process. Try to perform -getStatus using valid -start parameter but invalid
- * process name. Attempt should fail with an appropriate status code.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusInvalidName() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
- bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus("invalidProcess", "?start=2010-01-01T01:00Z");
- InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND);
- }
-
- /**
- * Schedule process. Try to -getStatus without time range parameters. Attempt succeeds.
- * Adding logging information test as part of status information.
- * In case status does not contain jobId of instance the test should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusWoParams() throws Exception {
- bundles[0].setProcessConcurrency(5);
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].submitFeedsScheduleProcess(prism);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
- Status.RUNNING, EntityType.PROCESS, 5);
- List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
- InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0);
- List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
- Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message");
- }
-
- /**
- * Schedule process with timeout set to 2 minutes. Wait till it become timed-out. -getStatus
- * of that process. Check that all materialized instances are failed.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceStatusTimedOut() throws Exception {
- bundles[0].setInputFeedDataPath(feedInputTimedOutPath);
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessTimeOut(2, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputTimedOutPath);
- bundles[0].setProcessConcurrency(3);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, Status.TIMEDOUT,
- EntityType.PROCESS);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
- InstanceUtil.validateFailedInstances(r, 3);
- }
-
- /**
- * Check that default end time param value is now.
- */
- @Test
- public void testDefaultEndTimeParam()
- throws OozieClientException, IOException, InterruptedException, AuthenticationException, URISyntaxException,
- JAXBException {
- //set validity to have 12 instances
- String start = TimeUtil.getTimeWrtSystemTime(-60);
- String end = TimeUtil.getTimeWrtSystemTime(0);
- bundles[0].setProcessValidity(start, end);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(3);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- //make first 3 instances running
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, Status.RUNNING,
- EntityType.PROCESS);
- //check instances status with end, expected first 10 instances
- InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=" + start + "&end=" + TimeUtil.addMinsToTime(end, -11));
- InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0);
- //request the same but without end, expected to have the latest 10 instances
- r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=" + start);
- InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0);
- //the same with numResults which includes/excludes all running instances
- r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=" + start + "&end=" + TimeUtil.addMinsToTime(end, -16) + "&numResults=9");
- InstanceUtil.validateResponse(r, 9, 3, 0, 6, 0);
- //expected end is set to now, thus getting last 9 instances
- r = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=" + start + "&numResults=9");
- InstanceUtil.validateResponse(r, 9, 0, 0, 9, 0);
- }
-
- /*
- * Function to match the workflows obtained from instance status and oozie.
- */
- private boolean matchWorkflows(List<String> instanceWf, List<String> oozieWf) {
- Collections.sort(instanceWf);
- Collections.sort(oozieWf);
- if (instanceWf.size() != oozieWf.size()) {
- return false;
- }
- for (int index = 0; index < instanceWf.size(); index++) {
- if (!instanceWf.get(index).contains(oozieWf.get(index))) {
- return false;
- }
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
deleted file mode 100644
index 4a27a0a..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-
-
-/**
- * Process instance suspend tests.
- */
-@Test(groups = "embedded")
-public class ProcessInstanceSuspendTest extends BaseTestClass {
-
- private String baseTestHDFSDir = cleanAndGetTestDir();
- private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN;
- private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN;
- private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private String processName;
- private OozieClient clusterOC = serverOC.get(0);
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setInputFeedDataPath(feedInputPath);
- bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
- processName = bundles[0].getProcessName();
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() throws IOException {
- removeTestClassEntities();
- HadoopUtil.deleteDirIfExists(baseTestHDFSDir, clusterFS);
- }
-
- /**
- * Schedule process. Try to suspend instances with start/end parameters which are
- * wider then process validity range. Succeeds.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendLargeRange() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessConcurrency(5);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
- prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
- result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
- InstanceUtil.validateResponse(result, 5, 0, 5, 0, 0);
- }
-
- /**
- * Schedule single-instance process. Wait till instance succeed. Try to suspend
- * succeeded instance. Action should be performed successfully as indempotent action.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendSucceeded() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z");
- AssertUtil.assertSucceeded(r);
- }
-
- /**
- * Schedule process. Check that all instances are running. Suspend them. Check that all are
- * suspended. In every action valid time range is used.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendAll() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessConcurrency(5);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
- prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- InstanceUtil.validateResponse(result, 5, 0, 5, 0, 0);
- }
-
- /**
- * Schedule process and try to perform -suspend action without date range parameters.
- * Attempt should fail. Will fail because of jira : https://issues.apache.org/jira/browse/FALCON-710
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendWoParams() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName, null);
- InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE);
- }
-
- /**
- * Schedule process with 3 running and 2 waiting instances expected. Suspend ones which are
- * running. Check that now 3 are suspended and 2 are still waiting.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendStartAndEnd() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessConcurrency(3);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
- InstanceUtil.validateResponse(result, 5, 3, 0, 2, 0);
- prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
- result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
- InstanceUtil.validateResponse(result, 5, 0, 3, 2, 0);
- }
-
- /**
- * Try to suspend process which wasn't submitted and scheduled. Action should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendNonExistent() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessConcurrency(5);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceSuspend("invalidName", "?start=2010-01-02T01:20Z");
- InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND);
- }
-
- /**
- * Schedule process. Perform -suspend action using only -start parameter which points to start
- * time of process. Attempt suspends all instances
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendOnlyStart() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessConcurrency(3);
- bundles[0].submitFeedsScheduleProcess(prism);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?start=2010-01-02T01:00Z");
- InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE);
- }
-
- /**
- * Schedule process. Perform -suspend action using only -end parameter.
- * Should fail with appropriate status message.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendOnlyEnd() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessConcurrency(3);
- bundles[0].submitFeedsScheduleProcess(prism);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?end=2010-01-02T01:05Z");
- InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE);
- }
-
- /**
- * Schedule process with a number of instances running. Perform -suspend action using params
- * such that they aim to suspend the last instance. Check that only
- * the last instance is suspended.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void testProcessInstanceSuspendSuspendLast() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessConcurrency(5);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
- prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?start=2010-01-02T01:20Z&end=2010-01-02T01:23Z");
- result = prism.getProcessHelper().getProcessInstanceStatus(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- InstanceUtil.validateResponse(result, 5, 4, 1, 0, 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
deleted file mode 100644
index 6a12fc8..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.PolicyType;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.TestNGException;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-/**
- * Process late data test.
- */
-@Test(groups = "embedded")
-public class ProcessLateRerunTest extends BaseTestClass {
- private ColoHelper cluster1 = servers.get(0);
- private OozieClient cluster1OC = serverOC.get(0);
- private FileSystem cluster1FS = serverFS.get(0);
- private String baseTestHDFSDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
- private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN;
- private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN;
- private static final Logger LOGGER = Logger.getLogger(ProcessLateRerunTest.class);
-
- @BeforeClass(alwaysRun = true)
- public void uploadWorkflow() throws Exception {
- uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- Bundle bundle = BundleUtil.readLateDataBundle();
- bundles[0] = new Bundle(bundle, servers.get(0));
- bundles[0].generateUniqueBundle(this);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].setInputFeedDataPath(feedInputPath);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Test demonstrates rerunning process for late arrival of data.
- * Initially there is no input data and empty folders are processed.
- * It checks the number of rerun attempts once late data has been added
- * ensuring that late rerun happened.
- */
- @Test(enabled = true)
- public void testProcessLateRerunOnEmptyFolder() throws Exception {
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 30);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- bundles[0].setProcessValidity(startTime, endTime);
- bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
- bundles[0].setProcessConcurrency(2);
-
- String inputName = bundles[0].getProcessObject().getFirstInputName();
- bundles[0].setProcessLatePolicy(getLateData(2, "minutes", "periodic", inputName, aggregateWorkflowDir));
-
- bundles[0].submitAndScheduleProcess();
- AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
- TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
-
- getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 1);
-
- int sleepMins = 6;
- for(int i=0; i < sleepMins; i++) {
- LOGGER.info("Waiting...");
- TimeUtil.sleepSeconds(60);
- }
- InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
- List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- bundles[0].getProcessName(), EntityType.PROCESS);
- String bundleID = bundleList.get(0);
-
- OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
- }
-
- /**
- * Test demonstrates rerunning process for late arrival of data.
- * Initially there is some data which is processed. It checks the number of rerun attempts
- * once further more data has been added ensuring that late rerun happened.
- */
- @Test(enabled = true)
- public void testProcessLateRerunWithData() throws Exception {
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 30);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- bundles[0].setProcessValidity(startTime, endTime);
- bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
- bundles[0].setProcessConcurrency(2);
-
- String inputName = bundles[0].getProcessObject().getFirstInputName();
-
- bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
- bundles[0].submitAndScheduleProcess();
- AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
- TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
-
- getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, true, 1);
-
- int sleepMins = 6;
- for(int i=0; i < sleepMins; i++) {
- LOGGER.info("Waiting...");
- TimeUtil.sleepSeconds(60);
- }
- InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
- List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- bundles[0].getProcessName(), EntityType.PROCESS);
- String bundleID = bundleList.get(0);
-
- OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
- }
-
- /**
- * Test demonstrates rerunning process for late arrival of data for multiple input folders.
- * It checks the number of rerun attempts once further more data has been added ensuring that late rerun happened.
- */
- @Test(enabled = true)
- public void testProcessLateRerunWithMultipleFolders() throws Exception {
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 30);
- String startInstance = "now(0,-5)";
- String endInstance = "now(0,0)";
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- bundles[0].setProcessValidity(startTime, endTime);
- bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
- String inputName = bundles[0].getProcessObject().getFirstInputName();
-
- bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
- bundles[0].setProcessConcurrency(2);
-
- // Increase the window of input for process
- bundles[0].setDatasetInstances(startInstance, endInstance);
- bundles[0].submitAndScheduleProcess();
-
- AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
- TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
-
- getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 3);
-
- int sleepMins = 6;
- for(int i=0; i < sleepMins; i++) {
- LOGGER.info("Waiting...");
- TimeUtil.sleepSeconds(60);
- }
- InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
- List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- bundles[0].getProcessName(), EntityType.PROCESS);
- String bundleID = bundleList.get(0);
-
- OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1);
- }
-
- /**
- * Test demonstrates rerunning process for late arrival of data for gate folders.
- * Late rerun will not work on gate folder, so no retry attempt on the appended data.
- */
- @Test(enabled = true)
- public void testProcessLateRerunWithGate() throws Exception {
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 30);
- String startInstance = "now(0,-5)";
- String endInstance = "now(0,0)";
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- bundles[0].setProcessValidity(startTime, endTime);
- bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes);
- bundles[0].setProcessConcurrency(2);
-
- // Increase the window of input for process
- bundles[0].setDatasetInstances(startInstance, endInstance);
-
- ProcessMerlin process = bundles[0].getProcessObject();
- String inputName = process.getFirstInputName();
- Input tempFeed = process.getInputs().getInputs().get(0);
-
- Input gateInput = new Input();
- gateInput.setName("Gate");
- gateInput.setFeed(tempFeed.getFeed());
- gateInput.setEnd("now(0,1)");
- gateInput.setStart("now(0,1)");
- process.getInputs().getInputs().add(gateInput);
- bundles[0].setProcessData(process.toString());
-
- bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir));
-
- bundles[0].submitAndScheduleProcess();
- AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
- TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0);
-
- getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 7);
-
- int sleepMins = 6;
- for(int i=0; i < sleepMins; i++) {
- LOGGER.info("Waiting...");
- TimeUtil.sleepSeconds(60);
- }
-
- InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
- List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- bundles[0].getProcessName(), EntityType.PROCESS);
- String bundleID = bundleList.get(0);
-
- OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 0);
- }
-
- /*
- dataFlag - denotes whether process should run initially on empty folders or folders containing data
- dataFolder - denotes the folder where you want to upload data for late rerun
- */
- private void getAndCreateDependencies(ColoHelper prismHelper, Bundle bundle,
- OozieClient oozieClient, FileSystem clusterFS,
- boolean dataFlag, int dataFolder) {
- try {
- List<String> bundles = null;
- for (int i = 0; i < 10; ++i) {
- bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- bundle.getProcessName(), EntityType.PROCESS);
- if (bundles.size() > 0) {
- break;
- }
- TimeUtil.sleepSeconds(30);
- }
- Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
- String bundleID = bundles.get(0);
- LOGGER.info("bundle id: " + bundleID);
- List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID);
- for (int i = 0; i < 10 && missingDependencies == null; ++i) {
- TimeUtil.sleepSeconds(30);
- missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID);
- }
- Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
-
- //print missing dependencies
- for (String dependency : missingDependencies) {
- LOGGER.info("dependency from job: " + dependency);
- }
-
- //create missing dependencies
- LOGGER.info("Creating missing dependencies...");
- OozieUtil.createMissingDependencies(prismHelper, EntityType.PROCESS, bundle.getProcessName(), 0, 0);
-
- //Adding data to empty folders depending on dataFlag
- if (dataFlag) {
- int tempCount = 1;
- for (String location : missingDependencies) {
- if (tempCount==1) {
- LOGGER.info("Transferring data to : " + location);
- HadoopUtil.copyDataToFolder(clusterFS, location,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- tempCount++;
- }
- }
- }
-
- //Process succeeding on empty folders
- LOGGER.info("Waiting for process to succeed...");
- InstanceUtil.waitTillInstanceReachState(oozieClient, bundle.getProcessName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
- TimeUtil.sleepSeconds(30);
-
- //Adding data to check late rerun
- int tempCounter = 1;
- for (String dependency : missingDependencies) {
- if (tempCounter==dataFolder) {
- LOGGER.info("Transferring late data to : " + dependency);
- HadoopUtil.copyDataToFolder(clusterFS, dependency,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.properties"));
- }
- tempCounter++;
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- throw new TestNGException(e);
- }
- }
-
- private static LateProcess getLateData(int delay, String delayUnits, String retryType,
- String inputData, String workflowDir) {
- LateInput lateInput = new LateInput();
- lateInput.setInput(inputData);
- lateInput.setWorkflowPath(workflowDir);
- LateProcess lateProcess = new LateProcess();
- lateProcess.setDelay(new Frequency(delayUnits + "(" + delay + ")"));
- lateProcess.setPolicy(PolicyType.fromValue(retryType));
- lateProcess.getLateInputs().add(lateInput);
- return lateProcess;
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
deleted file mode 100644
index 8422796..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.Job.Status;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.FileOutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests with process lib folder with workflow.xml.
- */
-@Test(groups = "embedded")
-public class ProcessLibPathLoadTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private String testDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = testDir + "/aggregator";
- private static final Logger LOGGER = Logger.getLogger(ProcessLibPathLoadTest.class);
-
- private String oozieLib = MerlinConstants.OOZIE_EXAMPLE_LIB;
- private String oozieLibName = oozieLib.substring(oozieLib.lastIndexOf('/') + 1);
- private String filename = OSUtil.concat(OSUtil.OOZIE_LIB_FOLDER, "lib", oozieLibName);
- private String processName;
- private String process;
-
- @BeforeClass(alwaysRun = true)
- public void createTestData() throws Exception {
- LOGGER.info("in @BeforeClass");
- FileUtils.forceMkdir(new File(OSUtil.concat(OSUtil.OOZIE_LIB_FOLDER, "lib")));
- saveUrlToFile(oozieLib);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- uploadDirToClusters(aggregateWorkflowDir, OSUtil.OOZIE_LIB_FOLDER);
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setInputFeedDataPath(testDir + MINUTE_DATE_PATTERN);
- bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(testDir + "/output-data" + MINUTE_DATE_PATTERN);
- bundles[0].setProcessConcurrency(1);
- bundles[0].setProcessLibPath(aggregateWorkflowDir + "/lib");
- process = bundles[0].getProcessData();
- processName = Util.readEntityName(process);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- @AfterClass(alwaysRun = true)
- public void deleteJar() throws IOException {
- File file = new File(filename);
- Assert.assertEquals(file.delete(), true, filename + " is not present.");
- FileUtils.deleteDirectory(new File(OSUtil.concat(OSUtil.OOZIE_LIB_FOLDER, "lib")));
- }
-
- /**
- * Test which test a process with jar in lib location.
- * Schedule a process, it should succeed.
- *
- * @throws Exception
- */
- @Test
- public void setRightJarInWorkflowLib() throws Exception {
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
- }
-
- /**
- * Test which test a process with no jar in lib location.
- * Schedule a process, it should get killed.
- *
- * @throws Exception
- */
- @Test
- public void setNoJarInWorkflowLibLocation() throws Exception {
- HadoopUtil.deleteDirIfExists(aggregateWorkflowDir + "/lib/" + oozieLibName, clusterFS);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.KILLED);
- }
-
- /**
- * Function to download jar at remote public location.
- * @param urlString public location from where jar is to be downloaded
- * filename is the location where the jar is to be saved
- * @throws Exception
- */
- private void saveUrlToFile(String urlString)
- throws IOException {
-
- URL url = new URL(urlString);
- String link;
- HttpURLConnection http = (HttpURLConnection) url.openConnection();
- Map<String, List<String>> header = http.getHeaderFields();
- while (isRedirected(header)) {
- link = header.get("Location").get(0);
- url = new URL(link);
- http = (HttpURLConnection) url.openConnection();
- header = http.getHeaderFields();
- }
-
- InputStream input = http.getInputStream();
- byte[] buffer = new byte[4096];
- int n;
- OutputStream output = new FileOutputStream(new File(filename));
- while ((n = input.read(buffer)) != -1) {
- output.write(buffer, 0, n);
- }
- output.close();
- }
-
- private static boolean isRedirected(Map<String, List<String>> header) {
- for (String hv : header.get(null)) {
- if (hv.contains(" 301 ") || hv.contains(" 302 ")) {
- return true;
- }
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
deleted file mode 100644
index 4196d99..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.Job.Status;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-/**
- * Tests with process lib folder detached from workflow.xml.
- */
-@Test(groups = "embedded")
-public class ProcessLibPathTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private String testDir = cleanAndGetTestDir();
- private String testLibDir = testDir + "/TestLib";
- private static final Logger LOGGER = Logger.getLogger(ProcessLibPathTest.class);
- private String processName;
- private String process;
-
- @BeforeClass(alwaysRun = true)
- public void createTestData() throws Exception {
- LOGGER.info("in @BeforeClass");
- Bundle b = BundleUtil.readELBundle();
- b.generateUniqueBundle(this);
- b = new Bundle(b, cluster);
- String startDate = "2010-01-01T22:00Z";
- String endDate = "2010-01-02T03:00Z";
- b.setInputFeedDataPath(testDir + "/input" + MINUTE_DATE_PATTERN);
- String prefix = b.getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
- List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setInputFeedDataPath(testDir + MINUTE_DATE_PATTERN);
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(testDir + "/output-data" + MINUTE_DATE_PATTERN);
- bundles[0].setProcessConcurrency(1);
- bundles[0].setProcessLibPath(testLibDir);
- process = bundles[0].getProcessData();
- processName = Util.readEntityName(process);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Tests a process with no lib folder in workflow location.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void setDifferentLibPathWithNoLibFolderInWorkflowfLocaltion() throws Exception {
- String workflowDir = testLibDir + "/aggregatorLib1/";
- HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
- bundles[0].setProcessWorkflow(workflowDir);
- LOGGER.info("processData: " + Util.prettyPrintXml(process));
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
- }
-
- /**
- * Test which test a process with wrong jar in lib folder in workflow location.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void setDifferentLibPathWithWrongJarInWorkflowLib() throws Exception {
- String workflowDir = testLibDir + "/aggregatorLib2/";
- HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
- HadoopUtil.recreateDir(clusterFS, workflowDir + "/lib");
- HadoopUtil.copyDataToFolder(clusterFS, workflowDir + "/lib/invalid.jar",
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- bundles[0].setProcessWorkflow(workflowDir);
- LOGGER.info("processData: " + Util.prettyPrintXml(process));
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
deleted file mode 100644
index f4c9b30..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
-* Process SLA tests.
-*/
-@Test(groups = "embedded")
-public class ProcessSLATest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private String baseTestHDFSDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
- private static final Logger LOGGER = Logger.getLogger(ProcessSLATest.class);
-
- @BeforeClass(alwaysRun = true)
- public void uploadWorkflow() throws Exception {
- uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 20);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].submitClusters(prism);
- bundles[0].setInputFeedDataPath(baseTestHDFSDir + MINUTE_DATE_PATTERN);
- bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN);
- bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
- bundles[0].submitFeeds(prism);
- bundles[0].setProcessConcurrency(1);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].setProcessValidity(startTime, endTime);
- bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Schedule process with correctly adjusted sla. Response should reflect success.
- *
- */
- @Test
- public void scheduleValidProcessSLA() throws Exception {
-
- ProcessMerlin processMerlin = bundles[0].getProcessObject();
- processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
- new Frequency("6", Frequency.TimeUnit.hours));
- bundles[0].setProcessData(processMerlin.toString());
- ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
- AssertUtil.assertSucceeded(response);
- }
-
- /**
- * Schedule process with slaStart and slaEnd having equal value. Response should reflect success.
- *
- */
- @Test
- public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception {
-
- ProcessMerlin processMerlin = bundles[0].getProcessObject();
- processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
- new Frequency("3", Frequency.TimeUnit.hours));
- bundles[0].setProcessData(processMerlin.toString());
- ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
- AssertUtil.assertSucceeded(response);
- }
-
- /**
- * Schedule process with slaEnd less than slaStart. Response should reflect failure.
- *
- */
- @Test
- public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception {
-
- ProcessMerlin processMerlin = bundles[0].getProcessObject();
- processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours),
- new Frequency("2", Frequency.TimeUnit.hours));
- bundles[0].setProcessData(processMerlin.toString());
- ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
- LOGGER.info("response : " + response.getMessage());
-
- String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "("
- + processMerlin.getSla().getShouldStartIn().getFrequency() + ")is greater than shouldEndIn: "
- + processMerlin.getSla().getShouldEndIn().getTimeUnit() +"("
- + processMerlin.getSla().getShouldEndIn().getFrequency() + ")";
- validate(response, message);
- }
-
- /**
- * Schedule process with timeout greater than slaStart. Response should reflect success.
- *
- */
- @Test
- public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception {
-
- ProcessMerlin processMerlin = bundles[0].getProcessObject();
- processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours));
- processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
- new Frequency("4", Frequency.TimeUnit.hours));
- bundles[0].setProcessData(processMerlin.toString());
- ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
- AssertUtil.assertSucceeded(response);
- }
-
- /**
- * Schedule process with timeout less than slaStart. Response should reflect failure.
- *
- */
- @Test
- public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception {
-
- ProcessMerlin processMerlin = bundles[0].getProcessObject();
- processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours));
- processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
- new Frequency("4", Frequency.TimeUnit.hours));
- bundles[0].setProcessData(processMerlin.toString());
- ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
-
- String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "("
- + processMerlin.getSla().getShouldStartIn().getFrequency() + ") is greater than timeout: "
- +processMerlin.getTimeout().getTimeUnit() +"(" + processMerlin.getTimeout().getFrequency() + ")";
- validate(response, message);
- }
-
- private void validate(ServiceResponse response, String message) throws Exception {
- AssertUtil.assertFailed(response);
- LOGGER.info("Expected message is : " + message);
- Assert.assertTrue(response.getMessage().contains(message),
- "Correct response was not present in process schedule. Process response is : "
- + response.getMessage());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java
deleted file mode 100644
index dbb45a6..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.PolicyType;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Tests related to update feature.
- */
-@Test(groups = "embedded")
-public class ProcessUpdateTest extends BaseTestClass {
-
- private OozieClient clusterOC = serverOC.get(0);
- private String baseTestHDFSDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
- private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN;
- private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN;
- private static final Logger LOGGER = Logger.getLogger(ProcessUpdateTest.class);
-
- @BeforeClass(alwaysRun = true)
- public void uploadWorkflow() throws Exception {
- uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- Bundle bundle = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundle, servers.get(0));
- bundles[0].generateUniqueBundle(this);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].setInputFeedDataPath(feedInputPath);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
- }
-
- /**
- * Test for https://issues.apache.org/jira/browse/FALCON-99.
- * Scenario: schedule a process which doesn't have late data handling and then update it to have it.
- * Check that new coordinator was created.
- */
- @Test
- public void updateProcessWithLateData() throws Exception {
- String start = TimeUtil.getTimeWrtSystemTime(-60);
- String end = TimeUtil.getTimeWrtSystemTime(10);
- bundles[0].submitAndScheduleAllFeeds();
- ProcessMerlin process = bundles[0].getProcessObject();
- process.setValidity(start, end);
- process.setLateProcess(null);
- prism.getProcessHelper().submitAndSchedule(process.toString());
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, process.toString(), 0);
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, process.getName(), EntityType.PROCESS);
-
- //update process to have late data handling
- LateProcess lateProcess = new LateProcess();
- lateProcess.setDelay(new Frequency("hours(1)"));
- lateProcess.setPolicy(PolicyType.EXP_BACKOFF);
- LateInput lateInput = new LateInput();
- lateInput.setInput("inputData");
- lateInput.setWorkflowPath(aggregateWorkflowDir);
- lateProcess.getLateInputs().add(lateInput);
- process.setLateProcess(lateProcess);
- LOGGER.info("Updated process xml: " + Util.prettyPrintXml(process.toString()));
- AssertUtil.assertSucceeded(prism.getProcessHelper().update(process.toString(), process.toString()));
-
- //check that new coordinator was created
- String newBundleId = OozieUtil.getLatestBundleID(clusterOC, process.getName(), EntityType.PROCESS);
- Assert.assertNotEquals(bundleId, newBundleId, "New Bundle should be created.");
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
-}