You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2015/03/05 06:13:00 UTC

falcon git commit: FALCON-1066 Add test in falcon to test process SLA feature. Contributed by Pragya M

Repository: falcon
Updated Branches:
  refs/heads/master 03c86eb69 -> 8fa2c49fd


FALCON-1066 Add test in falcon to test process SLA feature. Contributed by Pragya M


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8fa2c49f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8fa2c49f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8fa2c49f

Branch: refs/heads/master
Commit: 8fa2c49fd193e6ff2a216e9cc6e615a8225c2ca6
Parents: 03c86eb
Author: samarthg <sa...@apacge.org>
Authored: Thu Mar 5 05:06:54 2015 +0000
Committer: samarthg <sa...@apacge.org>
Committed: Thu Mar 5 05:06:54 2015 +0000

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   4 +-
 .../regression/Entities/ProcessMerlin.java      |  15 ++
 .../falcon/regression/ProcessSLATest.java       | 171 +++++++++++++++++++
 3 files changed, 189 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8fa2c49f/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 09b535d..3079fb0 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,7 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
-   
+  
+   FALCON-1066 Add test in falcon to test process SLA feature(Pragya M via Samarth G)
+ 
    FALCON-964 add test in falcon regression to test loading of jar present in user lib(Pragya M 
    via Samarth Gupta)
    

http://git-wip-us.apache.org/repos/asf/falcon/blob/8fa2c49f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index 36b0df5..f869844 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -23,6 +23,8 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.Sla;
 import org.apache.falcon.entity.v0.process.ACL;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
@@ -263,6 +265,19 @@ public class ProcessMerlin extends Process {
         this.setACL(acl);
     }
 
+    /**
+     * Set SLA.
+     * @param slaStart : start value of SLA
+     * @param slaEnd : end value of SLA
+     */
+
+    public void setSla(Frequency slaStart, Frequency slaEnd) {
+        Sla sla = new Sla();
+        sla.setShouldStartIn(slaStart);
+        sla.setShouldEndIn(slaEnd);
+        this.setSla(sla);
+    }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/8fa2c49f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
new file mode 100644
index 0000000..cd7eba4
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/**
+* Process SLA tests.
+*/
+@Test(groups = "embedded")
+public class ProcessSLATest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private String baseTestHDFSDir = cleanAndGetTestDir();
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    private static final Logger LOGGER = Logger.getLogger(ProcessSLATest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 20);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle(this);
+        bundles[0].submitClusters(prism);
+        bundles[0].setInputFeedDataPath(baseTestHDFSDir + MINUTE_DATE_PATTERN);
+        bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN);
+        bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
+        bundles[0].submitFeeds(prism);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeTestClassEntities();
+    }
+
+    /**
+     * Schedule process with correctly adjusted sla. Response should reflect success.
+     *
+     */
+    @Test
+    public void scheduleValidProcessSLA() throws Exception {
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
+                new Frequency("6", Frequency.TimeUnit.hours));
+        bundles[0].setProcessData(processMerlin.toString());
+        ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+        AssertUtil.assertSucceeded(response);
+    }
+
+    /**
+     * Schedule process with slaStart and slaEnd having equal value. Response should reflect success.
+     *
+     */
+    @Test
+    public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception {
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours),
+                new Frequency("3", Frequency.TimeUnit.hours));
+        bundles[0].setProcessData(processMerlin.toString());
+        ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+        AssertUtil.assertSucceeded(response);
+    }
+
+    /**
+     * Schedule process with slaEnd less than slaStart. Response should reflect failure.
+     *
+     */
+    @Test
+    public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception {
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours),
+                new Frequency("2", Frequency.TimeUnit.hours));
+        bundles[0].setProcessData(processMerlin.toString());
+        ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+        LOGGER.info("response : " + response.getMessage());
+
+        String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "("
+                + processMerlin.getSla().getShouldStartIn().getFrequency() + ")is greater than shouldEndIn: "
+                + processMerlin.getSla().getShouldEndIn().getTimeUnit() +"("
+                + processMerlin.getSla().getShouldEndIn().getFrequency() + ")";
+        validate(response, message);
+    }
+
+    /**
+     * Schedule process with timeout greater than slaStart. Response should reflect success.
+     *
+     */
+    @Test
+    public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception {
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours));
+        processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
+                new Frequency("4", Frequency.TimeUnit.hours));
+        bundles[0].setProcessData(processMerlin.toString());
+        ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+        AssertUtil.assertSucceeded(response);
+    }
+
+    /**
+     * Schedule process with timeout less than slaStart. Response should reflect failure.
+     *
+     */
+    @Test
+    public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception {
+
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+        processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours));
+        processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours),
+                new Frequency("4", Frequency.TimeUnit.hours));
+        bundles[0].setProcessData(processMerlin.toString());
+        ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString());
+
+        String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "("
+                + processMerlin.getSla().getShouldStartIn().getFrequency() + ") is greater than timeout: "
+                +processMerlin.getTimeout().getTimeUnit() +"(" + processMerlin.getTimeout().getFrequency() + ")";
+        validate(response, message);
+    }
+
+    private void validate(ServiceResponse response, String message) throws Exception {
+        AssertUtil.assertFailed(response);
+        LOGGER.info("Expected message is : " + message);
+        Assert.assertTrue(response.getMessage().contains(message),
+                "Correct response was not present in process schedule. Process response is : "
+                        + response.getMessage());
+    }
+
+}