You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2015/03/03 06:14:17 UTC

falcon git commit: FALCON-974 add test in falcon-regression for log mover feature. Contributed by Pragya M

Repository: falcon
Updated Branches:
  refs/heads/master 108b36d42 -> 23b325221


FALCON-974 add test in falcon-regression for log mover feature. Contributed by Pragya M


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/23b32522
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/23b32522
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/23b32522

Branch: refs/heads/master
Commit: 23b325221b4758e1eb5df75726382f5025b162cb
Parents: 108b36d
Author: samarthg <sa...@apacge.org>
Authored: Tue Mar 3 10:32:02 2015 +0530
Committer: samarthg <sa...@apacge.org>
Committed: Tue Mar 3 10:32:02 2015 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../falcon/regression/core/bundle/Bundle.java   |   7 +
 .../apache/falcon/regression/LogMoverTest.java  | 170 +++++++++++++++++++
 .../merlin/src/test/resources/LogMover/id.pig   |  21 +++
 .../src/test/resources/LogMover/workflow.xml    |  39 +++++
 5 files changed, 239 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 8bd89ef..8df604a 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
 
   NEW FEATURES
    
+   FALCON-974 add test in falcon-regression for log mover feature(Pragya via Samarth Gupta)
+   
    FALCON-843 add test to support current & last week el expression(Pragya M via Samarth G)
 
    FALCON-1035 Add test in falcon regression for validate feature. Validate is exposed 

http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index 8e449fb..0dfd895 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -638,6 +638,13 @@ public class Bundle {
         writeFeedElement(feedElement, feedName);
     }
 
+    public void setOutputFeedAvailabilityFlag(String flag) {
+        String feedName = getOutputFeedNameFromBundle();
+        FeedMerlin feedElement = getFeedElement(feedName);
+        feedElement.setAvailabilityFlag(flag);
+        writeFeedElement(feedElement, feedName);
+    }
+
     public void setCLusterColo(String colo) {
         ClusterMerlin c = getClusterElement();
         c.setColo(colo);

http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
new file mode 100644
index 0000000..4ce6026
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Properties;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+/**
+ * LogMover Test.
+ * Adds job launcher success/failure logs to falcon staging directory.
+ * It is not working for map-reduce actions(FALCON-1038).
+ * Using pig-action to test this feature.
+ */
+@Test(groups = "embedded")
+public class LogMoverTest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private String pigTestDir = cleanAndGetTestDir();
+    private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator";
+    private String inputPath = pigTestDir + "/input" + MINUTE_DATE_PATTERN;
+    private String propPath = pigTestDir + "/LogMover";
+    private static final Logger LOGGER = Logger.getLogger(LogMoverTest.class);
+    private String processName;
+    private String process;
+    private String startDate;
+    private String endDate;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        LOGGER.info("in @BeforeMethod");
+        startDate = TimeUtil.getTimeWrtSystemTime(-3);
+        endDate = TimeUtil.getTimeWrtSystemTime(3);
+
+        LOGGER.info("startDate : " + startDate + " , endDate : " + endDate);
+        //copy pig script and workflow
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES + "LogMover");
+        Bundle bundle = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundle, cluster);
+        bundles[0].generateUniqueBundle(this);
+        bundles[0].setInputFeedDataPath(inputPath);
+        bundles[0].setInputFeedPeriodicity(1, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(pigTestDir + "/output-data" + MINUTE_DATE_PATTERN);
+        bundles[0].setOutputFeedAvailabilityFlag("_SUCCESS");
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].setProcessInputNames("INPUT");
+        bundles[0].setProcessOutputNames("OUTPUT");
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(1, TimeUnit.minutes);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
+                bundles[0].getFeedDataPathPrefix(), dataDates);
+
+        // Defining path to be used in pig script
+        final Process processElement = bundles[0].getProcessObject();
+        final Properties properties = new Properties();
+        final Property property = new Property();
+        property.setName("inputPath");
+        property.setValue(propPath);
+        properties.getProperties().add(property);
+        processElement.setProperties(properties);
+        bundles[0].setProcessData(processElement.toString());
+        process = bundles[0].getProcessData();
+        processName = Util.readEntityName(process);
+
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeTestClassEntities();
+    }
+
+    /**
+     *Schedule a process. It should succeed and job launcher success information
+     * should be present in falcon staging directory.
+     */
+    @Test(groups = {"singleCluster"})
+    public void logMoverSucceedTest() throws Exception {
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+
+        //Copy data to let pig job succeed
+        HadoopUtil.copyDataToFolder(clusterFS, propPath, OSUtil.RESOURCES + "pig");
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        Assert.assertTrue(validate(true), "Success logs are not present");
+    }
+
+    /**
+     *Schedule a process. It should fail and job launcher failure information
+     * should be present in falcon staging directory.
+     */
+    @Test(groups = {"singleCluster"})
+    public void logMoverFailTest() throws Exception {
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
+                        CoordinatorAction.Status.KILLED, EntityType.PROCESS);
+
+        Assert.assertTrue(validate(false), "Filed logs are not present");
+    }
+
+    private boolean validate(boolean logFlag) throws Exception {
+        String stagingDir= MerlinConstants.STAGING_LOCATION;
+        String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs";
+        List<Path> logmoverPath = HadoopUtil
+                .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path)));
+        if (logFlag) {
+            for(int index=0; index < logmoverPath.size(); index++) {
+                if (logmoverPath.get(index).toString().contains("SUCCEEDED")) {
+                    return true;
+                }
+            }
+        } else {
+            for(int index=0; index < logmoverPath.size(); index++) {
+                if (logmoverPath.get(index).toString().contains("FAILED")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/resources/LogMover/id.pig
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/LogMover/id.pig b/falcon-regression/merlin/src/test/resources/LogMover/id.pig
new file mode 100644
index 0000000..2d5e64b
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/LogMover/id.pig
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+A = load '$INPUT' using PigStorage(':');
+B = foreach A generate $0 as id;
+store B into '$OUTPUT' USING PigStorage();
+cp $inputNew $outputNew

http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml b/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml
new file mode 100644
index 0000000..906481c
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.3" name="copy-workflow">
+    <start to="aggregator" />
+    <action name="aggregator">
+        <pig>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <script>id.pig</script>
+            <param>INPUT=${INPUT}</param>
+            <param>OUTPUT=${OUTPUT}</param>
+            <param>inputNew=${inputPath}</param>
+            <param>outputNew="$OUTPUT/op/"</param>
+
+        </pig>
+        <ok to="end"/>
+        <error to="failPig"/>
+    </action>
+
+    <kill name="failPig">
+    <message>The Pig job has failed.Reason: message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>