You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2015/03/05 06:13:00 UTC
falcon git commit: FALCON-1066 Add test in falcon to test process SLA
feature. Contributed by Pragya M
Repository: falcon
Updated Branches:
refs/heads/master 03c86eb69 -> 8fa2c49fd
FALCON-1066 Add test in falcon to test process SLA feature. Contributed by Pragya M
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8fa2c49f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8fa2c49f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8fa2c49f
Branch: refs/heads/master
Commit: 8fa2c49fd193e6ff2a216e9cc6e615a8225c2ca6
Parents: 03c86eb
Author: samarthg <sa...@apacge.org>
Authored: Thu Mar 5 05:06:54 2015 +0000
Committer: samarthg <sa...@apacge.org>
Committed: Thu Mar 5 05:06:54 2015 +0000
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 4 +-
.../regression/Entities/ProcessMerlin.java | 15 ++
.../falcon/regression/ProcessSLATest.java | 171 +++++++++++++++++++
3 files changed, 189 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/8fa2c49f/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 09b535d..3079fb0 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,7 +5,9 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
-
+
+ FALCON-1066 Add test in falcon to test process SLA feature(Pragya M via Samarth G)
+
FALCON-964 add test in falcon regression to test loading of jar present in user lib(Pragya M
via Samarth Gupta)
http://git-wip-us.apache.org/repos/asf/falcon/blob/8fa2c49f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index 36b0df5..f869844 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -23,6 +23,8 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.Sla;
import org.apache.falcon.entity.v0.process.ACL;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
@@ -263,6 +265,19 @@ public class ProcessMerlin extends Process {
this.setACL(acl);
}
+ /**
+ * Set SLA.
+ * @param slaStart : start value of SLA
+ * @param slaEnd : end value of SLA
+ */
+
+ public void setSla(Frequency slaStart, Frequency slaEnd) {
+ Sla sla = new Sla();
+ sla.setShouldStartIn(slaStart);
+ sla.setShouldEndIn(slaEnd);
+ this.setSla(sla);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8fa2c49f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
new file mode 100644
index 0000000..cd7eba4
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/**
+* Process SLA tests.
+*/
+@Test(groups = "embedded")
+public class ProcessSLATest extends BaseTestClass {
+
+ private ColoHelper cluster = servers.get(0);
+ private String baseTestHDFSDir = cleanAndGetTestDir();
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+ private static final Logger LOGGER = Logger.getLogger(ProcessSLATest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(0);
+ String endTime = TimeUtil.addMinsToTime(startTime, 20);
+ LOGGER.info("Time range between : " + startTime + " and " + endTime);
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle(this);
+ bundles[0].submitClusters(prism);
+ bundles[0].setInputFeedDataPath(baseTestHDFSDir + MINUTE_DATE_PATTERN);
+ bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN);
+ bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
+ bundles[0].submitFeeds(prism);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setProcessValidity(startTime, endTime);
+ bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeTestClassEntities();
+ }
+
+ /**
+ * Schedule process with correctly adjusted sla. Response should reflect success.
+ *
+ */
+ @Test
+ public void scheduleValidProcessSLA() throws Exception {
+
+ ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
+ new Frequency("6", Frequency.TimeUnit.hours));
+ bundles[0].setProcessData(processMerlin.toString());
+ ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+ AssertUtil.assertSucceeded(response);
+ }
+
+ /**
+ * Schedule process with slaStart and slaEnd having equal value. Response should reflect success.
+ *
+ */
+ @Test
+ public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception {
+
+ ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
+ new Frequency("3", Frequency.TimeUnit.hours));
+ bundles[0].setProcessData(processMerlin.toString());
+ ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+ AssertUtil.assertSucceeded(response);
+ }
+
+ /**
+ * Schedule process with slaEnd less than slaStart. Response should reflect failure.
+ *
+ */
+ @Test
+ public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception {
+
+ ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours),
+ new Frequency("2", Frequency.TimeUnit.hours));
+ bundles[0].setProcessData(processMerlin.toString());
+ ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+ LOGGER.info("response : " + response.getMessage());
+
+ String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "("
+ + processMerlin.getSla().getShouldStartIn().getFrequency() + ")is greater than shouldEndIn: "
+ + processMerlin.getSla().getShouldEndIn().getTimeUnit() +"("
+ + processMerlin.getSla().getShouldEndIn().getFrequency() + ")";
+ validate(response, message);
+ }
+
+ /**
+ * Schedule process with timeout greater than slaStart. Response should reflect success.
+ *
+ */
+ @Test
+ public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception {
+
+ ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours));
+ processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
+ new Frequency("4", Frequency.TimeUnit.hours));
+ bundles[0].setProcessData(processMerlin.toString());
+ ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+ AssertUtil.assertSucceeded(response);
+ }
+
+ /**
+ * Schedule process with timeout less than slaStart. Response should reflect failure.
+ *
+ */
+ @Test
+ public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception {
+
+ ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours));
+ processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
+ new Frequency("4", Frequency.TimeUnit.hours));
+ bundles[0].setProcessData(processMerlin.toString());
+ ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+
+ String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "("
+ + processMerlin.getSla().getShouldStartIn().getFrequency() + ") is greater than timeout: "
+ +processMerlin.getTimeout().getTimeUnit() +"(" + processMerlin.getTimeout().getFrequency() + ")";
+ validate(response, message);
+ }
+
+ private void validate(ServiceResponse response, String message) throws Exception {
+ AssertUtil.assertFailed(response);
+ LOGGER.info("Expected message is : " + message);
+ Assert.assertTrue(response.getMessage().contains(message),
+ "Correct response was not present in process schedule. Process response is : "
+ + response.getMessage());
+ }
+
+}