You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/25 08:00:34 UTC

git commit: FALCON-589 Add test cases for various feed operations on Hcat feeds contributed by Karishma Gulati

Repository: incubator-falcon
Updated Branches:
  refs/heads/master d5fdb3294 -> 5000fbbd6


FALCON-589 Add test cases for various feed operations on Hcat feeds contributed by Karishma Gulati


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5000fbbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5000fbbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5000fbbd

Branch: refs/heads/master
Commit: 5000fbbd687998769cd9aac183a5373ed49ae267
Parents: d5fdb32
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Mon Aug 25 11:30:20 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Mon Aug 25 11:30:20 2014 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +-
 .../regression/hcat/HCatFeedOperationsTest.java | 282 +++++++++++++++++++
 2 files changed, 284 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5000fbbd/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 929a881..1357808 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,7 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
-
+   FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G 
+   via Samarth Gupta)
   IMPROVEMENTS
    FALCON-619 ELExp_FutureAndLatestTest stabilization (Paul Isaychuk via Arpit Gupta)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5000fbbd/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
new file mode 100644
index 0000000..d994780
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hcat;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HCatUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HCatFeedOperationsTest extends BaseTestClass {
+
+    ColoHelper cluster = servers.get(0);
+    OozieClient clusterOC = serverOC.get(0);
+    HCatClient clusterHC = cluster.getClusterHelper().getHCatClient();
+
+    ColoHelper cluster2 = servers.get(1);
+    OozieClient cluster2OC = serverOC.get(1);
+    HCatClient cluster2HC = cluster2.getClusterHelper().getHCatClient();
+
+    private String dbName = "default";
+    private String tableName = "hcatFeedOperationsTest";
+    private String randomTblName = "randomTable_HcatFeedOperationsTest";
+    private String feed;
+    private String aggregateWorkflowDir = baseHDFSDir + "/HCatFeedOperationsTest/aggregator";
+    private static final Logger LOGGER = Logger.getLogger(HCatFeedOperationsTest.class);
+
+    public void uploadWorkflow() throws Exception {
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        //create an empty table for feed operations
+        ArrayList<HCatFieldSchema> partitions = new ArrayList<HCatFieldSchema>();
+        partitions.add(HCatUtil.getStringSchema("year", "yearPartition"));
+        createEmptyTable(clusterHC, dbName, tableName, partitions);
+
+        //A random table to test submission of replication feed when table doesn't exist on target
+        createEmptyTable(clusterHC, dbName, randomTblName, partitions);
+
+        //create empty table on target cluster
+        createEmptyTable(cluster2HC, dbName, tableName, new ArrayList<HCatFieldSchema>());
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readHCatBundle();
+        bundles[0] = new Bundle(bundle, cluster.getPrefix());
+        bundles[0].generateUniqueBundle();
+        bundles[0].setClusterInterface(Interfacetype.REGISTRY, cluster.getClusterHelper().getHCatEndpoint());
+
+
+        bundles[1] = new Bundle(bundle, cluster2.getPrefix());
+        bundles[1].generateUniqueBundle();
+        bundles[1].setClusterInterface(Interfacetype.REGISTRY, cluster2.getClusterHelper().getHCatEndpoint());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws HCatException {
+        removeBundles();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void deleteTable() throws HCatException {
+        clusterHC.dropTable(dbName, tableName, true);
+        clusterHC.dropTable(dbName, randomTblName, true);
+        cluster2HC.dropTable(dbName, tableName, true);
+    }
+
+    /**
+     * Submit Hcat feed when Hcat table mentioned in table uri does not exist. Response should reflect failure.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void submitFeedWhenTableDoesNotExist() throws Exception {
+        Bundle.submitCluster(bundles[1]);
+        feed = bundles[1].getInputFeedFromBundle();
+        FeedMerlin feedObj = new FeedMerlin(feed);
+        feedObj.setTableValue(dbName, randomTblName, FeedType.YEARLY.getHcatPathValue());
+        ServiceResponse response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString());
+        AssertUtil.assertFailed(response);
+    }
+
+    /**
+     * Submit Hcat feed when Hcat table mentioned in table uri exists. Delete that feed, and re-submit.
+     * All responses should reflect success.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void submitFeedPostDeletionWhenTableExists() throws Exception {
+        Bundle.submitCluster(bundles[0]);
+        feed = bundles[0].getInputFeedFromBundle();
+        FeedMerlin feedObj = new FeedMerlin(feed);
+        feedObj.setTableValue(dbName, tableName, FeedType.YEARLY.getHcatPathValue());
+        ServiceResponse response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString());
+        AssertUtil.assertSucceeded(response);
+
+        response = prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feedObj.toString());
+        AssertUtil.assertSucceeded(response);
+
+        response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString());
+        AssertUtil.assertSucceeded(response);
+    }
+
+    /**
+     * Submit Hcat Replication feed when Hcat table mentioned in table uri does not exist on target. The response is
+     * Partial, with successful with submit/schedule on source.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void submitAndScheduleReplicationFeedWhenTableDoesNotExistOnTarget() throws Exception {
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2099-01-01T00:00Z";
+        String tableUri = "catalog:" + dbName + ":" + randomTblName + "#year=${YEAR}";
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        bundles[0].setInputFeedTableUri(tableUri);
+
+        feed = bundles[0].getDataSets().get(0);
+        // set cluster 2 as the target.
+        feed = InstanceUtil.setFeedClusterWithTable(feed,
+                XmlUtil.createValidity(startDate, endDate),
+                XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                tableUri);
+
+        AssertUtil.assertPartial(
+                prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+                        feed)
+        );
+    }
+
+    /**
+     * Submit Hcat Replication feed when Hcat table mentioned in table uri exists on both source and target. The response is
+     * Psucceeded, and a replication co-rdinator should apear on target oozie. The test however does not ensure that
+     * replication goes through.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget() throws Exception {
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2099-01-01T00:00Z";
+        String tableUri = "catalog:" + dbName + ":" + tableName + "#year=${YEAR}";
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        bundles[0].setInputFeedTableUri(tableUri);
+
+        feed = bundles[0].getDataSets().get(0);
+        // set cluster 2 as the target.
+        feed = InstanceUtil.setFeedClusterWithTable(feed,
+                XmlUtil.createValidity(startDate, endDate),
+                XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+                tableUri);
+
+        AssertUtil.assertSucceeded(
+                prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+                        feed)
+        );
+        Assert.assertEquals(InstanceUtil
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                        "REPLICATION"), 1);
+        //This test doesn't wait for replication to succeed.
+    }
+
+    /**
+     * Submit Hcat Replication feed. Suspend the feed, and check that feed was suspended on
+     * both clusters. Now resume feed, and check that status is running on both clusters.
+     * The test however does not ensure that replication goes through.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void suspendAndResumeReplicationFeed() throws Exception {
+
+        submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
+
+        AssertUtil.assertSucceeded(
+                prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL,
+                        feed)
+        );
+
+        //check that feed suspended on both clusters
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+
+        AssertUtil.assertSucceeded(
+                prism.getFeedHelper().resume(Util.URLS.RESUME_URL,
+                        feed)
+        );
+
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.RUNNING);
+    }
+
+    /**
+     * Submit Hcat Replication feed. Delete the feed, and check that feed was deleted on
+     * both clusters. The test however does not ensure that replication goes through.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void deleteReplicationFeed() throws Exception {
+        submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
+
+        AssertUtil.assertSucceeded(
+                prism.getFeedHelper().delete(Util.URLS.DELETE_URL,
+                        feed)
+        );
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
+        AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.KILLED);
+    }
+
+
+    public static void createEmptyTable(HCatClient cli, String dbName, String tabName, List<HCatFieldSchema> partitionCols) throws HCatException{
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema("id", "id comment"));
+        HCatCreateTableDesc tableDesc = HCatCreateTableDesc
+                .create(dbName, tabName, cols)
+                .partCols(partitionCols)
+                .fileFormat("textfile")
+                .ifNotExists(true)
+                .isTableExternal(true)
+                .build();
+        cli.createTable(tableDesc);
+    }
+}