You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/25 08:00:34 UTC
git commit: FALCON-589 Add test cases for various feed operations on
Hcat feeds contributed by Karishma Gulati
Repository: incubator-falcon
Updated Branches:
refs/heads/master d5fdb3294 -> 5000fbbd6
FALCON-589 Add test cases for various feed operations on Hcat feeds contributed by Karishma Gulati
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5000fbbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5000fbbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5000fbbd
Branch: refs/heads/master
Commit: 5000fbbd687998769cd9aac183a5373ed49ae267
Parents: d5fdb32
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Mon Aug 25 11:30:20 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Mon Aug 25 11:30:20 2014 +0530
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +-
.../regression/hcat/HCatFeedOperationsTest.java | 282 +++++++++++++++++++
2 files changed, 284 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5000fbbd/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 929a881..1357808 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,7 +5,8 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
-
+ FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G
+ via Samarth Gupta)
IMPROVEMENTS
FALCON-619 ELExp_FutureAndLatestTest stabilization (Paul Isaychuk via Arpit Gupta)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5000fbbd/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
new file mode 100644
index 0000000..d994780
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hcat;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HCatUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HCatFeedOperationsTest extends BaseTestClass {
+
+ ColoHelper cluster = servers.get(0);
+ OozieClient clusterOC = serverOC.get(0);
+ HCatClient clusterHC = cluster.getClusterHelper().getHCatClient();
+
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster2OC = serverOC.get(1);
+ HCatClient cluster2HC = cluster2.getClusterHelper().getHCatClient();
+
+ private String dbName = "default";
+ private String tableName = "hcatFeedOperationsTest";
+ private String randomTblName = "randomTable_HcatFeedOperationsTest";
+ private String feed;
+ private String aggregateWorkflowDir = baseHDFSDir + "/HCatFeedOperationsTest/aggregator";
+ private static final Logger LOGGER = Logger.getLogger(HCatFeedOperationsTest.class);
+
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ //create an empty table for feed operations
+ ArrayList<HCatFieldSchema> partitions = new ArrayList<HCatFieldSchema>();
+ partitions.add(HCatUtil.getStringSchema("year", "yearPartition"));
+ createEmptyTable(clusterHC, dbName, tableName, partitions);
+
+ //A random table to test submission of replication feed when table doesn't exist on target
+ createEmptyTable(clusterHC, dbName, randomTblName, partitions);
+
+ //create empty table on target cluster
+ createEmptyTable(cluster2HC, dbName, tableName, new ArrayList<HCatFieldSchema>());
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+ Bundle bundle = BundleUtil.readHCatBundle();
+ bundles[0] = new Bundle(bundle, cluster.getPrefix());
+ bundles[0].generateUniqueBundle();
+ bundles[0].setClusterInterface(Interfacetype.REGISTRY, cluster.getClusterHelper().getHCatEndpoint());
+
+
+ bundles[1] = new Bundle(bundle, cluster2.getPrefix());
+ bundles[1].generateUniqueBundle();
+ bundles[1].setClusterInterface(Interfacetype.REGISTRY, cluster2.getClusterHelper().getHCatEndpoint());
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws HCatException {
+ removeBundles();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void deleteTable() throws HCatException {
+ clusterHC.dropTable(dbName, tableName, true);
+ clusterHC.dropTable(dbName, randomTblName, true);
+ cluster2HC.dropTable(dbName, tableName, true);
+ }
+
+ /**
+ * Submit Hcat feed when Hcat table mentioned in table uri does not exist. Response should reflect failure.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void submitFeedWhenTableDoesNotExist() throws Exception {
+ Bundle.submitCluster(bundles[1]);
+ feed = bundles[1].getInputFeedFromBundle();
+ FeedMerlin feedObj = new FeedMerlin(feed);
+ feedObj.setTableValue(dbName, randomTblName, FeedType.YEARLY.getHcatPathValue());
+ ServiceResponse response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString());
+ AssertUtil.assertFailed(response);
+ }
+
+ /**
+ * Submit Hcat feed when Hcat table mentioned in table uri exists. Delete that feed, and re-submit.
+ * All responses should reflect success.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void submitFeedPostDeletionWhenTableExists() throws Exception {
+ Bundle.submitCluster(bundles[0]);
+ feed = bundles[0].getInputFeedFromBundle();
+ FeedMerlin feedObj = new FeedMerlin(feed);
+ feedObj.setTableValue(dbName, tableName, FeedType.YEARLY.getHcatPathValue());
+ ServiceResponse response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString());
+ AssertUtil.assertSucceeded(response);
+
+ response = prism.getFeedHelper().delete(Util.URLS.DELETE_URL, feedObj.toString());
+ AssertUtil.assertSucceeded(response);
+
+ response = prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feedObj.toString());
+ AssertUtil.assertSucceeded(response);
+ }
+
+ /**
+ * Submit Hcat Replication feed when Hcat table mentioned in table uri does not exist on target. The response is
+ * Partial, with successful with submit/schedule on source.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void submitAndScheduleReplicationFeedWhenTableDoesNotExistOnTarget() throws Exception {
+ Bundle.submitCluster(bundles[0], bundles[1]);
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2099-01-01T00:00Z";
+ String tableUri = "catalog:" + dbName + ":" + randomTblName + "#year=${YEAR}";
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ bundles[0].setInputFeedTableUri(tableUri);
+
+ feed = bundles[0].getDataSets().get(0);
+ // set cluster 2 as the target.
+ feed = InstanceUtil.setFeedClusterWithTable(feed,
+ XmlUtil.createValidity(startDate, endDate),
+ XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ tableUri);
+
+ AssertUtil.assertPartial(
+ prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+ feed)
+ );
+ }
+
+ /**
+ * Submit Hcat Replication feed when Hcat table mentioned in table uri exists on both source and target. The response is
+ * Psucceeded, and a replication co-rdinator should apear on target oozie. The test however does not ensure that
+ * replication goes through.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget() throws Exception {
+ Bundle.submitCluster(bundles[0], bundles[1]);
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2099-01-01T00:00Z";
+ String tableUri = "catalog:" + dbName + ":" + tableName + "#year=${YEAR}";
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ bundles[0].setInputFeedTableUri(tableUri);
+
+ feed = bundles[0].getDataSets().get(0);
+ // set cluster 2 as the target.
+ feed = InstanceUtil.setFeedClusterWithTable(feed,
+ XmlUtil.createValidity(startDate, endDate),
+ XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ tableUri);
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+ feed)
+ );
+ Assert.assertEquals(InstanceUtil
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ "REPLICATION"), 1);
+ //This test doesn't wait for replication to succeed.
+ }
+
+ /**
+ * Submit Hcat Replication feed. Suspend the feed, and check that feed was suspended on
+ * both clusters. Now resume feed, and check that status is running on both clusters.
+ * The test however does not ensure that replication goes through.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void suspendAndResumeReplicationFeed() throws Exception {
+
+ submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL,
+ feed)
+ );
+
+ //check that feed suspended on both clusters
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().resume(Util.URLS.RESUME_URL,
+ feed)
+ );
+
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.RUNNING);
+ }
+
+ /**
+ * Submit Hcat Replication feed. Delete the feed, and check that feed was deleted on
+ * both clusters. The test however does not ensure that replication goes through.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void deleteReplicationFeed() throws Exception {
+ submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().delete(Util.URLS.DELETE_URL,
+ feed)
+ );
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.KILLED);
+ }
+
+
+ public static void createEmptyTable(HCatClient cli, String dbName, String tabName, List<HCatFieldSchema> partitionCols) throws HCatException{
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema("id", "id comment"));
+ HCatCreateTableDesc tableDesc = HCatCreateTableDesc
+ .create(dbName, tabName, cols)
+ .partCols(partitionCols)
+ .fileFormat("textfile")
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .build();
+ cli.createTable(tableDesc);
+ }
+}