You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/29 23:02:09 UTC

git commit: FALCON-743 Adding tests for cases related to usage of pipelines tag. Contributed by Paul Isaychuk

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 62cf8b4aa -> 9c74ee459


FALCON-743 Adding tests for cases related to usage of pipelines tag. Contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9c74ee45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9c74ee45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9c74ee45

Branch: refs/heads/master
Commit: 9c74ee459a939a4b3aff31ad83b922b9c08e94da
Parents: 62cf8b4
Author: Raghav Kumar Gautam <ra...@apache.org>
Authored: Mon Sep 29 13:59:46 2014 -0700
Committer: Raghav Kumar Gautam <ra...@apache.org>
Committed: Mon Sep 29 14:01:12 2014 -0700

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../regression/Entities/ProcessMerlin.java      |  13 ++
 .../falcon/regression/core/bundle/Bundle.java   |   9 +
 .../core/helpers/ClusterEntityHelperImpl.java   |   4 +
 .../core/helpers/DataEntityHelperImpl.java      |   5 +
 .../core/interfaces/IEntityManagerHelper.java   |  15 ++
 .../core/response/ServiceResponse.java          |  20 ++
 .../regression/lineage/ProcessPipelineTest.java | 184 +++++++++++++++++++
 8 files changed, 253 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 48a0792..573e8f6 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-743 Adding tests for cases related to usage of pipelines tag
+   (Paul Isaychuk via Raghav Kumar Gautam)
+
    FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G 
    via Samarth Gupta)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index 48ee049..a10dfc1 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.regression.Entities;
 
 import org.apache.commons.beanutils.PropertyUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.falcon.entity.v0.EntityType;
@@ -239,6 +240,18 @@ public class ProcessMerlin extends Process {
         setOutputs(os);
         setLateProcess(null);
     }
+
+    /**
+     * Sets process pipelines tag.
+     * @param pipelines set of pipelines to be set to process
+     */
+    public void setPipelineTag(String... pipelines){
+        if (ArrayUtils.isNotEmpty(pipelines)){
+            this.pipelines = StringUtils.join(pipelines, ",");
+        } else {
+            this.pipelines = null;
+        }
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index 6d8b032..eac3cb4 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -1015,4 +1015,13 @@ public class Bundle {
         return feedObject.getName();
     }
 
+    /**
+     * Sets process pipelines.
+     * @param pipelines proposed pipelines
+     */
+    public void setProcessPipeline(String... pipelines){
+        ProcessMerlin process = new ProcessMerlin(getProcessData());
+        process.setPipelineTag(pipelines);
+        setProcessData(process.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
index 49e4b06..38ca37d 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
@@ -119,4 +119,8 @@ public class ClusterEntityHelperImpl extends IEntityManagerHelper {
         throw new UnsupportedOperationException(INVALID_ERR);
     }
 
+    @Override
+    public ServiceResponse getListByPipeline(String pipeline){
+        throw new UnsupportedOperationException(INVALID_ERR);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
index 5127a91..79c3b9c 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.regression.core.helpers;
 
 import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
 import org.apache.falcon.regression.core.util.Util;
 
 /** Helper class to work with feed endpoints of a colo. */
@@ -36,5 +37,9 @@ public class DataEntityHelperImpl extends IEntityManagerHelper {
         return Util.readEntityName(entity);
     }
 
+    @Override
+    public ServiceResponse getListByPipeline(String pipeline){
+        throw new UnsupportedOperationException("Not valid for Feed Entity.");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
index 7f8ac6a..ec8726c 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java
@@ -501,4 +501,19 @@ public abstract class IEntityManagerHelper {
         return (InstancesResult) InstanceUtil
             .createAndSendRequestProcessInstance(url, params, allColo, null);
     }
+
+    /**
+     * Lists all entities which are tagged by a given pipeline.
+     * @param pipeline filter
+     * @return service response
+     * @throws AuthenticationException
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    public ServiceResponse getListByPipeline(String pipeline)
+        throws AuthenticationException, IOException, URISyntaxException {
+        String url = createUrl(this.hostname + URLS.LIST_URL.getValue() + "/" + getEntityType());
+        url += "?filterBy=PIPELINES:" + pipeline;
+        return Util.sendRequest(url, "get", null, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
index 004da4a..3c5bd9e 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
@@ -18,13 +18,18 @@
 
 package org.apache.falcon.regression.core.response;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.http.HttpResponse;
 import org.apache.log4j.Logger;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringReader;
 
 /** Class to represent falcon's response to a rest request. */
 public class ServiceResponse {
@@ -82,4 +87,19 @@ public class ServiceResponse {
 
     public ServiceResponse() {
     }
+
+    /**
+     * Retrieves EntitiesResult from a message if possible.
+     * @return EntitiesResult
+     */
+    public EntitiesResult getEntitiesResult(){
+        try {
+            JAXBContext jc = JAXBContext.newInstance(EntitiesResult.class);
+            Unmarshaller u = jc.createUnmarshaller();
+            return  (EntitiesResult) u.unmarshal(new StringReader(message));
+        } catch (JAXBException e) {
+            LOGGER.info("getEntitiesResult() failed:\n" + ExceptionUtils.getStackTrace(e));
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9c74ee45/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java
new file mode 100644
index 0000000..c4f7f72
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ProcessPipelineTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.lineage;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.EntityResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.CleanupUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import org.xml.sax.SAXException;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Submitting and listing processes with different pipeline names.
+ */
+@Test(groups = "embedded")
+public class ProcessPipelineTest extends BaseTestClass{
+    private static final Logger LOGGER = Logger.getLogger(ProcessPipelineTest.class);
+    private ColoHelper cluster = servers.get(0);
+    private String testDir = "/ProcessPipelineTest";
+    private String baseTestHDFSDir = baseHDFSDir + testDir;
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+
+    @BeforeClass(alwaysRun = true)
+    public void setUp() throws IOException {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void prepareData() throws IOException {
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], servers.get(0));
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(){
+        removeBundles();
+    }
+
+    /**
+     * Submit List of processes with a given pipeline tag. There are few pipelines and for each
+     * pipeline there is different number of processes. Test expects an appropriate list of
+     * processes for each given pipeline tag.
+     */
+    @Test
+    public void listPipeline()
+        throws URISyntaxException, IOException, AuthenticationException, JAXBException {
+        //match processes to pipelines
+        HashMap<String, List<String>> map = new HashMap<String, List<String>>();
+        //index for few different pipelines
+        try{
+            for(int p = 0, i = 0, n = 0, d = 3; p < 3; p++, d++){
+                n += d + 1;
+                String pipeline = "pipeline" + p;
+                map.put(pipeline, new ArrayList<String>());
+                //index for new processes for current pipeline
+                for(; i < n; i++){
+                    String processName = "process-" + i;
+                    bundles[0].setProcessName(processName);
+                    bundles[0].setProcessPipeline(pipeline);
+                    bundles[0].submitProcess(true);
+                    map.get(pipeline).add(processName);
+                }
+            }
+            LOGGER.info("Expected set of processes: " + map);
+            //now go through pipelines and check that their processes lists match to expected
+            for(int p = 0, n = 3; p < 3; p++, n++){
+                String pipeline = "pipeline" + p;
+                ServiceResponse response = prism.getProcessHelper().getListByPipeline(pipeline);
+                List<EntityResult> processes = response.getEntitiesResult().getEntities();
+                //check that all retrieved processes match to expected list
+                List<String> expected = map.get(pipeline);
+                Assert.assertEquals(processes.size(), expected.size(),
+                    String.format("Invalid number of processes for pipeline [%s].", pipeline));
+                for(EntityResult process : processes){
+                    Assert.assertTrue(expected.contains(process.getName()), String.format("Expected "
+                        + "list %s doesn't contain %s for %s.", expected, process.getName(), pipeline));
+                }
+            }
+        } finally {
+            CleanupUtil.cleanAllProcessesQuietly(prism, prism.getProcessHelper());
+        }
+    }
+
+    /**
+     * Submit a process with pipeline element, get process definition expecting retrieved xml to
+     * be the same.
+     */
+    @Test
+    public void testProcessWithPipeline()
+        throws URISyntaxException, IOException, AuthenticationException, JAXBException,
+        SAXException {
+        String pipeline = "samplePipeline";
+        bundles[0].setProcessPipeline(pipeline);
+        bundles[0].submitProcess(true);
+        String process = bundles[0].getProcessData();
+        String processDef = prism.getProcessHelper().getEntityDefinition(process).getMessage();
+        Assert.assertTrue(XmlUtil.isIdentical(process, processDef), "Definitions are not equal.");
+    }
+
+    /**
+     * Test cases:
+     * -list processes with pipeline name having special or UTF-8 chars expecting request to fail.
+     * -list processes with invalid or long pipeline name expecting request to pass.
+     * -submit a process with pipeline name with special or UTF-8 chars expecting submission to fail.
+     * -submit a process with a long pipeline name or many comma separated names process submission
+     * to go through.
+     *
+     * @param pipeline pipeline name
+     * @param action list or submit entities
+     * @param shouldSucceed should action succeed or not
+     */
+    @Test(dataProvider = "data")
+    public void testPipelines(String pipeline, String action, boolean shouldSucceed)
+        throws URISyntaxException, IOException, AuthenticationException, JAXBException {
+        bundles[0].setProcessPipeline(pipeline);
+        if (action.equals("list")){
+            if (shouldSucceed){
+                AssertUtil.assertSucceeded(cluster.getProcessHelper().getListByPipeline(pipeline));
+            }else {
+                AssertUtil.assertFailed(cluster.getProcessHelper().getListByPipeline(pipeline));
+            }
+        } else {
+            bundles[0].submitProcess(shouldSucceed);
+        }
+    }
+
+    @DataProvider(name = "data")
+    public Object[][] getTestData(){
+        String specialName = "$pec!alN@me#1";
+        String utf8Name = "UTF8Pipelin" + "\u20AC"; //euro symbol added to name
+        String longName = "TestItWithPipelineNameWhichIsLongEnough";
+        return new Object[][]{{specialName, "list", false}, {utf8Name, "list", false},
+            {"nonexistentPipeline", "list", true}, //expecting blank response
+            {longName, "list", true}, //expecting blank response
+            {specialName, "submit", false}, {utf8Name, "submit", false},
+            {longName, "submit", true},
+            {"pipeline0,pipeline1,pipeline2,pipeline3,pipeline4,pipeline5,pipeline6,pipeline7,"
+                 +"pipeline8,pipeline9,pipeline10,pipeline11", "submit", true}};
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void tearDownClass() throws IOException {
+        cleanTestDirs();
+    }
+}