You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/29 23:02:09 UTC
git commit: FALCON-743 Adding tests for cases related to usage of
pipelines tag. Contributed by Paul Isaychuk
Repository: incubator-falcon
Updated Branches:
refs/heads/master 62cf8b4aa -> 9c74ee459
FALCON-743 Adding tests for cases related to usage of pipelines tag. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9c74ee45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9c74ee45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9c74ee45
Branch: refs/heads/master
Commit: 9c74ee459a939a4b3aff31ad83b922b9c08e94da
Parents: 62cf8b4
Author: Raghav Kumar Gautam <ra...@apache.org>
Authored: Mon Sep 29 13:59:46 2014 -0700
Committer: Raghav Kumar Gautam <ra...@apache.org>
Committed: Mon Sep 29 14:01:12 2014 -0700
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +
.../regression/Entities/ProcessMerlin.java | 13 ++
.../falcon/regression/core/bundle/Bundle.java | 9 +
.../core/helpers/ClusterEntityHelperImpl.java | 4 +
.../core/helpers/DataEntityHelperImpl.java | 5 +
.../core/interfaces/IEntityManagerHelper.java | 15 ++
.../core/response/ServiceResponse.java | 20 ++
.../regression/lineage/ProcessPipelineTest.java | 184 +++++++++++++++++++
8 files changed, 253 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 48a0792..573e8f6 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-743 Adding tests for cases related to usage of pipelines tag
+ (Paul Isaychuk via Raghav Kumar Gautam)
+
FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G
via Samarth Gupta)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index 48ee049..a10dfc1 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -19,6 +19,7 @@
package org.apache.falcon.regression.Entities;
import org.apache.commons.beanutils.PropertyUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.falcon.entity.v0.EntityType;
@@ -239,6 +240,18 @@ public class ProcessMerlin extends Process {
setOutputs(os);
setLateProcess(null);
}
+
+ /**
+ * Sets process pipelines tag.
+ * @param pipelines set of pipelines to be set to process
+ */
+ public void setPipelineTag(String... pipelines){
+ if (ArrayUtils.isNotEmpty(pipelines)){
+ this.pipelines = StringUtils.join(pipelines, ",");
+ } else {
+ this.pipelines = null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index 6d8b032..eac3cb4 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -1015,4 +1015,13 @@ public class Bundle {
return feedObject.getName();
}
+ /**
+ * Sets process pipelines.
+ * @param pipelines proposed pipelines
+ */
+ public void setProcessPipeline(String... pipelines){
+ ProcessMerlin process = new ProcessMerlin(getProcessData());
+ process.setPipelineTag(pipelines);
+ setProcessData(process.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
index 49e4b06..38ca37d 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
@@ -119,4 +119,8 @@ public class ClusterEntityHelperImpl extends IEntityManagerHelper {
throw new UnsupportedOperationException(INVALID_ERR);
}
+ @Override
+ public ServiceResponse getListByPipeline(String pipeline){
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
index 5127a91..79c3b9c 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
@@ -19,6 +19,7 @@
package org.apache.falcon.regression.core.helpers;
import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.Util;
/** Helper class to work with feed endpoints of a colo. */
@@ -36,5 +37,9 @@ public class DataEntityHelperImpl extends IEntityManagerHelper {
return Util.readEntityName(entity);
}
+ @Override
+ public ServiceResponse getListByPipeline(String pipeline){
+ throw new UnsupportedOperationException("Not valid for Feed Entity.");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
index 7f8ac6a..ec8726c 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
@@ -501,4 +501,19 @@ public abstract class IEntityManagerHelper {
return (InstancesResult) InstanceUtil
.createAndSendRequestProcessInstance(url, params, allColo, null);
}
+
+ /**
+ * Lists all entities which are tagged by a given pipeline.
+ * @param pipeline filter
+ * @return service response
+ * @throws AuthenticationException
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ public ServiceResponse getListByPipeline(String pipeline)
+ throws AuthenticationException, IOException, URISyntaxException {
+ String url = createUrl(this.hostname + URLS.LIST_URL.getValue() + "/" + getEntityType());
+ url += "?filterBy=PIPELINES:" + pipeline;
+ return Util.sendRequest(url, "get", null, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
index 004da4a..3c5bd9e 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
@@ -18,13 +18,18 @@
package org.apache.falcon.regression.core.response;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.falcon.regression.core.util.Util;
import org.apache.http.HttpResponse;
import org.apache.log4j.Logger;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.StringReader;
/** Class to represent falcon's response to a rest request. */
public class ServiceResponse {
@@ -82,4 +87,19 @@ public class ServiceResponse {
public ServiceResponse() {
}
+
+ /**
+ * Retrieves EntitiesResult from a message if possible.
+ * @return EntitiesResult
+ */
+ public EntitiesResult getEntitiesResult(){
+ try {
+ JAXBContext jc = JAXBContext.newInstance(EntitiesResult.class);
+ Unmarshaller u = jc.createUnmarshaller();
+ return (EntitiesResult) u.unmarshal(new StringReader(message));
+ } catch (JAXBException e) {
+ LOGGER.info("getEntitiesResult() failed:\n" + ExceptionUtils.getStackTrace(e));
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java
new file mode 100644
index 0000000..c4f7f72
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.lineage;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.EntityResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.CleanupUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import org.xml.sax.SAXException;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Submitting and listing processes with different pipeline names.
+ */
+@Test(groups = "embedded")
+public class ProcessPipelineTest extends BaseTestClass{
+ private static final Logger LOGGER = Logger.getLogger(ProcessPipelineTest.class);
+ private ColoHelper cluster = servers.get(0);
+ private String testDir = "/ProcessPipelineTest";
+ private String baseTestHDFSDir = baseHDFSDir + testDir;
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+
+ @BeforeClass(alwaysRun = true)
+ public void setUp() throws IOException {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void prepareData() throws IOException {
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], servers.get(0));
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown(){
+ removeBundles();
+ }
+
+ /**
+ * Submit List of processes with a given pipeline tag. There are few pipelines and for each
+ * pipeline there is different number of processes. Test expects an appropriate list of
+ * processes for each given pipeline tag.
+ */
+ @Test
+ public void listPipeline()
+ throws URISyntaxException, IOException, AuthenticationException, JAXBException {
+ //match processes to pipelines
+ HashMap<String, List<String>> map = new HashMap<String, List<String>>();
+ //index for few different pipelines
+ try{
+ for(int p = 0, i = 0, n = 0, d = 3; p < 3; p++, d++){
+ n += d + 1;
+ String pipeline = "pipeline" + p;
+ map.put(pipeline, new ArrayList<String>());
+ //index for new processes for current pipeline
+ for(; i < n; i++){
+ String processName = "process-" + i;
+ bundles[0].setProcessName(processName);
+ bundles[0].setProcessPipeline(pipeline);
+ bundles[0].submitProcess(true);
+ map.get(pipeline).add(processName);
+ }
+ }
+ LOGGER.info("Expected set of processes: " + map);
+ //now go through pipelines and check that their processes lists match to expected
+ for(int p = 0, n = 3; p < 3; p++, n++){
+ String pipeline = "pipeline" + p;
+ ServiceResponse response = prism.getProcessHelper().getListByPipeline(pipeline);
+ List<EntityResult> processes = response.getEntitiesResult().getEntities();
+ //check that all retrieved processes match to expected list
+ List<String> expected = map.get(pipeline);
+ Assert.assertEquals(processes.size(), expected.size(),
+ String.format("Invalid number of processes for pipeline [%s].", pipeline));
+ for(EntityResult process : processes){
+ Assert.assertTrue(expected.contains(process.getName()), String.format("Expected "
+ + "list %s doesn't contain %s for %s.", expected, process.getName(), pipeline));
+ }
+ }
+ } finally {
+ CleanupUtil.cleanAllProcessesQuietly(prism, prism.getProcessHelper());
+ }
+ }
+
+ /**
+ * Submit a process with pipeline element, get process definition expecting retrieved xml to
+ * be the same.
+ */
+ @Test
+ public void testProcessWithPipeline()
+ throws URISyntaxException, IOException, AuthenticationException, JAXBException,
+ SAXException {
+ String pipeline = "samplePipeline";
+ bundles[0].setProcessPipeline(pipeline);
+ bundles[0].submitProcess(true);
+ String process = bundles[0].getProcessData();
+ String processDef = prism.getProcessHelper().getEntityDefinition(process).getMessage();
+ Assert.assertTrue(XmlUtil.isIdentical(process, processDef), "Definitions are not equal.");
+ }
+
+ /**
+ * Test cases:
+ * -list processes with pipeline name having special or UTF-8 chars expecting request to fail.
+ * -list processes with invalid or long pipeline name expecting request to pass.
+ * -submit a process with pipeline name with special or UTF-8 chars expecting submission to fail.
+ * -submit a process with a long pipeline name or many comma separated names process submission
+ * to go through.
+ *
+ * @param pipeline pipeline name
+ * @param action list or submit entities
+ * @param shouldSucceed should action succeed or not
+ */
+ @Test(dataProvider = "data")
+ public void testPipelines(String pipeline, String action, boolean shouldSucceed)
+ throws URISyntaxException, IOException, AuthenticationException, JAXBException {
+ bundles[0].setProcessPipeline(pipeline);
+ if (action.equals("list")){
+ if (shouldSucceed){
+ AssertUtil.assertSucceeded(cluster.getProcessHelper().getListByPipeline(pipeline));
+ }else {
+ AssertUtil.assertFailed(cluster.getProcessHelper().getListByPipeline(pipeline));
+ }
+ } else {
+ bundles[0].submitProcess(shouldSucceed);
+ }
+ }
+
+ @DataProvider(name = "data")
+ public Object[][] getTestData(){
+ String specialName = "$pec!alN@me#1";
+ String utf8Name = "UTF8Pipelin" + "\u20AC"; //euro symbol added to name
+ String longName = "TestItWithPipelineNameWhichIsLongEnough";
+ return new Object[][]{{specialName, "list", false}, {utf8Name, "list", false},
+ {"nonexistentPipeline", "list", true}, //expecting blank response
+ {longName, "list", true}, //expecting blank response
+ {specialName, "submit", false}, {utf8Name, "submit", false},
+ {longName, "submit", true},
+ {"pipeline0,pipeline1,pipeline2,pipeline3,pipeline4,pipeline5,pipeline6,pipeline7,"
+ +"pipeline8,pipeline9,pipeline10,pipeline11", "submit", true}};
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws IOException {
+ cleanTestDirs();
+ }
+}