You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2015/03/03 06:14:17 UTC
falcon git commit: FALCON-974 add test in falcon-regression for log
mover feature. Contributed by Pragya M
Repository: falcon
Updated Branches:
refs/heads/master 108b36d42 -> 23b325221
FALCON-974 add test in falcon-regression for log mover feature. Contributed by Pragya M
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/23b32522
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/23b32522
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/23b32522
Branch: refs/heads/master
Commit: 23b325221b4758e1eb5df75726382f5025b162cb
Parents: 108b36d
Author: samarthg <sa...@apacge.org>
Authored: Tue Mar 3 10:32:02 2015 +0530
Committer: samarthg <sa...@apacge.org>
Committed: Tue Mar 3 10:32:02 2015 +0530
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 2 +
.../falcon/regression/core/bundle/Bundle.java | 7 +
.../apache/falcon/regression/LogMoverTest.java | 170 +++++++++++++++++++
.../merlin/src/test/resources/LogMover/id.pig | 21 +++
.../src/test/resources/LogMover/workflow.xml | 39 +++++
5 files changed, 239 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 8bd89ef..8df604a 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
NEW FEATURES
+ FALCON-974 add test in falcon-regression for log mover feature(Pragya via Samarth Gupta)
+
FALCON-843 add test to support current & last week el expression(Pragya M via Samarth G)
FALCON-1035 Add test in falcon regression for validate feature. Validate is exposed
http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index 8e449fb..0dfd895 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -638,6 +638,13 @@ public class Bundle {
writeFeedElement(feedElement, feedName);
}
+ public void setOutputFeedAvailabilityFlag(String flag) {
+ String feedName = getOutputFeedNameFromBundle();
+ FeedMerlin feedElement = getFeedElement(feedName);
+ feedElement.setAvailabilityFlag(flag);
+ writeFeedElement(feedElement, feedName);
+ }
+
public void setCLusterColo(String colo) {
ClusterMerlin c = getClusterElement();
c.setColo(colo);
http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
new file mode 100644
index 0000000..4ce6026
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Properties;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+/**
+ * LogMover Test.
+ * Adds job launcher success/failure logs to falcon staging directory.
+ * It is not working for map-reduce actions(FALCON-1038).
+ * Using pig-action to test this feature.
+ */
+@Test(groups = "embedded")
+public class LogMoverTest extends BaseTestClass {
+
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private OozieClient clusterOC = serverOC.get(0);
+ private String pigTestDir = cleanAndGetTestDir();
+ private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator";
+ private String inputPath = pigTestDir + "/input" + MINUTE_DATE_PATTERN;
+ private String propPath = pigTestDir + "/LogMover";
+ private static final Logger LOGGER = Logger.getLogger(LogMoverTest.class);
+ private String processName;
+ private String process;
+ private String startDate;
+ private String endDate;
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ LOGGER.info("in @BeforeMethod");
+ startDate = TimeUtil.getTimeWrtSystemTime(-3);
+ endDate = TimeUtil.getTimeWrtSystemTime(3);
+
+ LOGGER.info("startDate : " + startDate + " , endDate : " + endDate);
+ //copy pig script and workflow
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES + "LogMover");
+ Bundle bundle = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundle, cluster);
+ bundles[0].generateUniqueBundle(this);
+ bundles[0].setInputFeedDataPath(inputPath);
+ bundles[0].setInputFeedPeriodicity(1, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(pigTestDir + "/output-data" + MINUTE_DATE_PATTERN);
+ bundles[0].setOutputFeedAvailabilityFlag("_SUCCESS");
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setProcessInputNames("INPUT");
+ bundles[0].setProcessOutputNames("OUTPUT");
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(1, TimeUnit.minutes);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
+ bundles[0].getFeedDataPathPrefix(), dataDates);
+
+ // Defining path to be used in pig script
+ final Process processElement = bundles[0].getProcessObject();
+ final Properties properties = new Properties();
+ final Property property = new Property();
+ property.setName("inputPath");
+ property.setValue(propPath);
+ properties.getProperties().add(property);
+ processElement.setProperties(properties);
+ bundles[0].setProcessData(processElement.toString());
+ process = bundles[0].getProcessData();
+ processName = Util.readEntityName(process);
+
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeTestClassEntities();
+ }
+
+ /**
+ *Schedule a process. It should succeed and job launcher success information
+ * should be present in falcon staging directory.
+ */
+ @Test(groups = {"singleCluster"})
+ public void logMoverSucceedTest() throws Exception {
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+
+ //Copy data to let pig job succeed
+ HadoopUtil.copyDataToFolder(clusterFS, propPath, OSUtil.RESOURCES + "pig");
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ Assert.assertTrue(validate(true), "Success logs are not present");
+ }
+
+ /**
+ *Schedule a process. It should fail and job launcher failure information
+ * should be present in falcon staging directory.
+ */
+ @Test(groups = {"singleCluster"})
+ public void logMoverFailTest() throws Exception {
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
+ CoordinatorAction.Status.KILLED, EntityType.PROCESS);
+
+ Assert.assertTrue(validate(false), "Filed logs are not present");
+ }
+
+ private boolean validate(boolean logFlag) throws Exception {
+ String stagingDir= MerlinConstants.STAGING_LOCATION;
+ String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs";
+ List<Path> logmoverPath = HadoopUtil
+ .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path)));
+ if (logFlag) {
+ for(int index=0; index < logmoverPath.size(); index++) {
+ if (logmoverPath.get(index).toString().contains("SUCCEEDED")) {
+ return true;
+ }
+ }
+ } else {
+ for(int index=0; index < logmoverPath.size(); index++) {
+ if (logmoverPath.get(index).toString().contains("FAILED")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/resources/LogMover/id.pig
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/LogMover/id.pig b/falcon-regression/merlin/src/test/resources/LogMover/id.pig
new file mode 100644
index 0000000..2d5e64b
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/LogMover/id.pig
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+A = load '$INPUT' using PigStorage(':');
+B = foreach A generate $0 as id;
+store B into '$OUTPUT' USING PigStorage();
+cp $inputNew $outputNew
http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml b/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml
new file mode 100644
index 0000000..906481c
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml
@@ -0,0 +1,39 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.3" name="copy-workflow">
+ <start to="aggregator" />
+ <action name="aggregator">
+ <pig>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <script>id.pig</script>
+ <param>INPUT=${INPUT}</param>
+ <param>OUTPUT=${OUTPUT}</param>
+ <param>inputNew=${inputPath}</param>
+ <param>outputNew="$OUTPUT/op/"</param>
+
+ </pig>
+ <ok to="end"/>
+ <error to="failPig"/>
+ </action>
+
+ <kill name="failPig">
+ <message>The Pig job has failed.Reason: message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+ </kill>
+ <end name="end"/>
+</workflow-app>