You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2015/07/29 19:09:42 UTC
[3/5] falcon git commit: FALCON-1319: Contribute HiveDr,
Mirror tests and some test fixes contributed by Namit Maheshwari,
Paul Isaychuk, Raghav Kumar Gautam & Ruslan Ostafiychuk
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
new file mode 100644
index 0000000..1c788a3
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
@@ -0,0 +1,700 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.cli.FalconCLI;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.RecipeMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.supportClasses.NotifyingAssert;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HiveAssert;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import org.testng.asserts.SoftAssert;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalPartitionedTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createPartitionedTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createSerDeTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable;
+
+/**
+ * Hive DR Testing.
+ */
+@Test(groups = "embedded")
+public class HiveDRTest extends BaseTestClass {
+ private static final Logger LOGGER = Logger.getLogger(HiveDRTest.class);
+ private static final String DB_NAME = "hdr_sdb1";
+ private final ColoHelper cluster = servers.get(0);
+ private final ColoHelper cluster2 = servers.get(1);
+ private final ColoHelper cluster3 = servers.get(2);
+ private final FileSystem clusterFS = serverFS.get(0);
+ private final FileSystem clusterFS2 = serverFS.get(1);
+ private final FileSystem clusterFS3 = serverFS.get(2);
+ private final OozieClient clusterOC = serverOC.get(0);
+ private final OozieClient clusterOC2 = serverOC.get(1);
+ private final String baseTestHDFSDir = cleanAndGetTestDir() + "/HiveDR/";
+ private HCatClient clusterHC;
+ private HCatClient clusterHC2;
+ private RecipeMerlin recipeMerlin;
+ private Connection connection;
+ private Connection connection2;
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ clusterHC = cluster.getClusterHelper().getHCatClient();
+ clusterHC2 = cluster2.getClusterHelper().getHCatClient();
+ bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster);
+ bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2);
+ bundles[0].generateUniqueBundle(this);
+ bundles[1].generateUniqueBundle(this);
+ final ClusterMerlin srcCluster = bundles[0].getClusterElement();
+ final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
+ Bundle.submitCluster(bundles[0]);
+
+ if (MerlinConstants.IS_SECURE) {
+ recipeMerlin = RecipeMerlin.readFromDir("HiveDrSecureRecipe",
+ FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+ .withRecipeCluster(srcCluster);
+ } else {
+ recipeMerlin = RecipeMerlin.readFromDir("HiveDrRecipe",
+ FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+ .withRecipeCluster(srcCluster);
+ }
+ recipeMerlin.withSourceCluster(srcCluster)
+ .withTargetCluster(tgtCluster)
+ .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
+ .withValidity(TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(5));
+ recipeMerlin.setUniqueName(this.getClass().getSimpleName());
+
+ connection = cluster.getClusterHelper().getHiveJdbcConnection();
+ runSql(connection, "drop database if exists hdr_sdb1 cascade");
+ runSql(connection, "create database hdr_sdb1");
+ runSql(connection, "use hdr_sdb1");
+
+ connection2 = cluster2.getClusterHelper().getHiveJdbcConnection();
+ runSql(connection2, "drop database if exists hdr_sdb1 cascade");
+ runSql(connection2, "create database hdr_sdb1");
+ runSql(connection2, "use hdr_sdb1");
+ }
+
+ @Test
+ public void drPartition() throws Exception {
+ final String tblName = "partitionDR";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ runSql(connection,
+ "create table " + tblName + "(comment string) partitioned by (pname string)");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'DELETE') values"
+ + "('this partition is going to be deleted - should NOT appear after dr')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'REPLACE') values"
+ + "('this partition is going to be replaced - should NOT appear after dr')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'ADD_DATA') values"
+ + "('this partition will have more data - should appear after dr')");
+
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'NEW_PART') values"
+ + "('this partition has been added post bootstrap - should appear after dr')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'ADD_DATA') values"
+ + "('more data has been added post bootstrap - should appear after dr')");
+ runSql(connection,
+ "alter table " + tblName + " drop partition(pname = 'DELETE')");
+ runSql(connection,
+ "alter table " + tblName + " drop partition(pname = 'REPLACE')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'REPLACE') values"
+ + "('this partition has been replaced - should appear after dr')");
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+ ).assertAll();
+ }
+
+ @Test
+ public void drInsertOverwritePartition() throws Exception {
+ final String tblName = "drInsertOverwritePartition";
+ final String hlpTblName = "drInsertOverwritePartitionHelperTbl";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+ runSql(connection, "create table " + hlpTblName + "(comment string)");
+ runSql(connection,
+ "insert into table " + hlpTblName
+ + " values('overwrite data - should appear after dr')");
+ runSql(connection,
+ "insert into table " + hlpTblName + " values('newdata row2 - should appear after dr')");
+ runSql(connection,
+ "insert into table " + hlpTblName + " values('newdata row1 - should appear after dr')");
+
+ runSql(connection,
+ "create table " + tblName + "(comment string) partitioned by (pname string)");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'OLD_PART') values"
+ + "('this data should be retained - should appear after dr')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname = 'OVERWRITE_PART') values"
+ + "('this data should get overwritten - should NOT appear after dr')");
+
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ runSql(connection,
+ "insert overwrite table " + tblName + " partition (pname = 'OVERWRITE_PART') "
+ + "select * from " + hlpTblName + " where comment REGEXP '^overwrite'");
+ runSql(connection,
+ "insert overwrite table " + tblName + " partition (pname = 'NEW_DATA') "
+ + "select * from " + hlpTblName + " where comment REGEXP '^newdata'");
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+ ).assertAll();
+ }
+
+ @Test
+ public void drTwoTablesOneRequest() throws Exception {
+ final String tblName = "firstTableDR";
+ final String tbl2Name = "secondTableDR";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName + ',' + tbl2Name);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ runSql(connection,
+ "create table " + tblName + "(comment string)");
+ runSql(connection,
+ "create table " + tbl2Name + "(comment string)");
+
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+ bootstrapCopy(connection, clusterFS, tbl2Name, connection2, clusterFS2, tbl2Name);
+
+ runSql(connection,
+ "insert into table " + tblName + " values"
+ + "('this string has been added post bootstrap - should appear after dr')");
+ runSql(connection,
+ "insert into table " + tbl2Name + " values"
+ + "('this string has been added post bootstrap - should appear after dr')");
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ final NotifyingAssert anAssert = new NotifyingAssert(true);
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert);
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tbl2Name),
+ cluster2, clusterHC2.getTable(DB_NAME, tbl2Name), anAssert);
+ anAssert.assertAll();
+
+ }
+
+ @Test
+ public void drSerDeWithProperties() throws Exception {
+ final String tblName = "serdeTable";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ runSql(connection,
+ "create table " + tblName + "(comment string) "
+ + "row format serde 'org.apache.hive.hcatalog.data.JsonSerDe'");
+
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ runSql(connection,
+ "insert into table " + tblName + " values"
+ + "('this string has been added post bootstrap - should appear after dr')");
+
+ runSql(connection,
+ "ALTER TABLE " + tblName + " SET SERDEPROPERTIES ('someProperty' = 'value')");
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+ ).assertAll();
+
+ }
+
+ @Test
+ public void drChangeColumn() throws Exception {
+ final String tblName = "tableForColumnChange";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command1 = recipeMerlin.getSubmissionCommand();
+ final String recipe1Name = recipeMerlin.getName();
+ runSql(connection,
+ "create table " + tblName + "(id int)");
+
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed.");
+ runSql(connection,
+ "ALTER TABLE " + tblName + " CHANGE id id STRING COMMENT 'some_comment'");
+
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipe1Name, 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+ ).assertAll();
+ }
+
+
+ @Test
+ public void drTwoDstTablesTwoRequests() throws Exception {
+ final HCatClient clusterHC3 = cluster3.getClusterHelper().getHCatClient();
+ final Connection connection3 = cluster3.getClusterHelper().getHiveJdbcConnection();
+ runSql(connection3, "drop database if exists hdr_sdb1 cascade");
+ runSql(connection3, "create database hdr_sdb1");
+ runSql(connection3, "use hdr_sdb1");
+
+ final String tblName = "vanillaTable";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final String recipe1Name = recipeMerlin.getName();
+ final List<String> command1 = recipeMerlin.getSubmissionCommand();
+
+ final Bundle bundle = BundleUtil.readHCatBundle();
+ bundle.generateUniqueBundle(this);
+ recipeMerlin.withTargetCluster(new Bundle(bundle, cluster3).getClusterElement());
+ recipeMerlin.setUniqueName(this.getClass().getSimpleName());
+
+ final List<String> command2 = recipeMerlin.getSubmissionCommand();
+ final String recipe2Name = recipeMerlin.getName();
+
+ runSql(connection, "create table " + tblName + "(comment string)");
+
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+ bootstrapCopy(connection, clusterFS, tblName, connection3, clusterFS3, tblName);
+
+ runSql(connection,
+ "insert into table " + tblName + " values"
+ + "('this string has been added post bootstrap - should appear after dr')");
+
+ Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed.");
+ Assert.assertEquals(Bundle.runFalconCLI(command2), 0, "Recipe submission failed.");
+
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipe1Name, 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipe2Name, 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ final NotifyingAssert anAssert = new NotifyingAssert(true);
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert);
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster3, clusterHC3.getTable(DB_NAME, tblName), anAssert);
+ anAssert.assertAll();
+ }
+
+ @Test
+ public void drExternalToNonExternal() throws Exception {
+ final String tblName = "externalToNonExternal";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ createExternalTable(connection, clusterFS, baseTestHDFSDir + "click_data/", tblName);
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ //change column name
+ runSql(connection,
+ "alter table " + tblName + " change column data data_new string");
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ final NotifyingAssert anAssert = new NotifyingAssert(true);
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
+ anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(),
+ clusterHC.getTable(DB_NAME, tblName).getTableName(),
+ "Source and destination tables should have different Tabletype");
+ anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+ clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+ "Source and destination tables should have different value of property EXTERNAL");
+ anAssert.assertAll();
+ }
+
+ @Test
+ public void drExtPartitionedToNonExtPartitioned() throws Exception {
+ final String tblName = "extPartitionedToNonExtPartitioned";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ createExternalPartitionedTable(connection, clusterFS,
+ baseTestHDFSDir + "click_data/", tblName);
+ runSql(connection2,
+ "create table " + tblName + " (data string, time string) partitioned by (date_ string)");
+ runSql(connection2, "alter table " + tblName + " add partition "
+ + "(date_='2001-01-01') location '" + baseTestHDFSDir + "click_data/2001-01-01/'");
+ runSql(connection2, "alter table " + tblName + " add partition "
+ + "(date_='2001-01-02') location '" + baseTestHDFSDir + "click_data/2001-01-02/'");
+
+ runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-01') "
+ + "values ('click1', '01:01:01')");
+ runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-02') "
+ + "values ('click2', '02:02:02')");
+
+ final NotifyingAssert anAssert = new NotifyingAssert(true);
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
+
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ //change column name
+ runSql(connection,
+ "alter table " + tblName + " change column data data_new string");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
+ anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(),
+ clusterHC.getTable(DB_NAME, tblName).getTableName(),
+ "Source and destination tables should have different Tabletype");
+ anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+ clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+ "Source and destination tables should have different value of property EXTERNAL");
+ anAssert.assertAll();
+ }
+
+ /**
+ * 1 src tbl 1 dst tbl. Change table properties and comment at the source.
+ * Changes should get reflected at destination.
+ */
+ @Test
+ public void drChangeCommentAndPropertyTest() throws Exception {
+ final String tblName = "myTable";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ runSql(connection, "create table " + tblName + "(field string)");
+ //add new table property
+ runSql(connection,
+ "ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'initialValue')");
+ //set comment
+ runSql(connection,
+ "ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment will be "
+ + "changed, SHOULD NOT appear')");
+
+ LOGGER.info(tblName + " before bootstrap copy: ");
+ runSql(connection, "describe extended " + tblName);
+
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ //change table property and comment
+ runSql(connection,
+ "ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'anotherValue')");
+ runSql(connection,
+ "ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment should "
+ + "appear after replication done')");
+
+ LOGGER.info(tblName + " after modifications, before replication: ");
+ runSql(connection, "describe extended " + tblName);
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+ ).assertAll();
+ }
+
+ @Test
+ public void dataGeneration() throws Exception {
+ runSql(connection, "use hdr_sdb1");
+ createVanillaTable(connection, "store_sales");
+ createSerDeTable(connection);
+ createPartitionedTable(connection);
+ createExternalTable(connection, clusterFS,
+ baseTestHDFSDir + "click_data/", "click_data");
+ createExternalPartitionedTable(connection, clusterFS,
+ baseTestHDFSDir + "click_data2/", "click_data2");
+
+ runSql(connection2, "use hdr_sdb1");
+ createVanillaTable(connection2, "store_sales");
+ createSerDeTable(connection2);
+ createPartitionedTable(connection2);
+ createExternalTable(connection2, clusterFS2,
+ baseTestHDFSDir + "click_data/", "click_data");
+ createExternalPartitionedTable(connection2, clusterFS2,
+ baseTestHDFSDir + "click_data2/", "click_data2");
+
+ final NotifyingAssert anAssert = new NotifyingAssert(true);
+ HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("hdr_sdb1"),
+ cluster2, clusterHC2.getDatabase("hdr_sdb1"), anAssert);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable("hdr_sdb1", "click_data"),
+ cluster2, clusterHC2.getTable("hdr_sdb1", "click_data"), anAssert);
+ anAssert.assertAll();
+
+ }
+
+ @Test(enabled = false)
+ public void assertionTest() throws Exception {
+ final SoftAssert anAssert = new SoftAssert();
+ HiveAssert.assertTableEqual(
+ cluster, clusterHC.getTable("default", "hcatsmoke10546"),
+ cluster2, clusterHC2.getTable("default", "hcatsmoke10548"), anAssert);
+ HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("default"), cluster2,
+ clusterHC2.getDatabase("default"), anAssert);
+ anAssert.assertAll();
+ }
+
+ /**
+ * Test creates a table on first cluster using static partitioning. Then it creates the same
+ * table on the second cluster using dynamic partitioning. Finally it checks the equality of
+ * these tables.
+ * @throws SQLException
+ * @throws IOException
+ */
+ @Test
+ public void dynamicPartitionsTest() throws SQLException, IOException {
+ //create table with static partitions on first cluster
+ createPartitionedTable(connection, false);
+
+ //create table with dynamic partitions on second cluster
+ createPartitionedTable(connection2, true);
+
+ //check that both tables are equal
+ HiveAssert.assertTableEqual(
+ cluster, clusterHC.getTable("hdr_sdb1", "global_store_sales"),
+ cluster2, clusterHC2.getTable("hdr_sdb1", "global_store_sales"), new SoftAssert()
+ ).assertAll();
+ }
+
+ /**
+ * 1 src tbl 1 dst tbl replication. Insert/delete/replace partitions using dynamic partition
+ * queries. The changes should get reflected at destination.
+ */
+ @Test
+ public void drInsertDropReplaceDynamicPartition() throws Exception {
+ final String tblName = "dynamicPartitionDR";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ //disable strict mode to use only dynamic partition
+ runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
+
+ runSql(connection,
+ "create table " + tblName + "(comment string) partitioned by (pname string)");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('this partition is going to be deleted - should NOT appear after dr', 'DELETE')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('this partition is going to be replaced - should NOT appear after dr', 'REPLACE')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('this partition will have more data - should appear after dr', 'ADD_DATA')");
+
+ LOGGER.info(tblName + " before bootstrap copying: ");
+ runSql(connection, "select * from " + tblName);
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('this partition has been added post bootstrap - should appear after dr', 'NEW_PART')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('more data has been added post bootstrap - should appear after dr', 'ADD_DATA')");
+ runSql(connection,
+ "alter table " + tblName + " drop partition(pname = 'DELETE')");
+ runSql(connection,
+ "alter table " + tblName + " drop partition(pname = 'REPLACE')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('this partition has been replaced - should appear after dr', 'REPLACE')");
+
+ LOGGER.info(tblName + " after modifications, before replication: ");
+ runSql(connection, "select * from " + tblName);
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+ ).assertAll();
+ }
+
+ /**
+ * 1 src tbl 1 dst tbl replication. Insert/overwrite partitions using dynamic partitions
+ * queries. The changes should get reflected at destination.
+ * @throws Exception
+ */
+ @Test
+ public void drInsertOverwriteDynamicPartition() throws Exception {
+ final String tblName = "drInsertOverwritePartition";
+ final String hlpTblName = "drInsertOverwritePartitionHelperTbl";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ //disable strict mode to use only dynamic partition
+ runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
+
+ runSql(connection,
+ "create table " + hlpTblName + "(comment string) partitioned by (pname string)");
+ runSql(connection,
+ "insert into table " + hlpTblName + " partition (pname)"
+ + " values('overwrite data - should appear after dr', 'OVERWRITE_PART')");
+ runSql(connection,
+ "insert into table " + hlpTblName + " partition (pname)"
+ + " values('newdata row2 - should appear after dr', 'NEW_DATA')");
+ runSql(connection,
+ "insert into table " + hlpTblName + " partition (pname)"
+ + " values('newdata row1 - should appear after dr', 'NEW_DATA')");
+
+ runSql(connection,
+ "create table " + tblName + "(comment string) partitioned by (pname string)");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('this data should be retained - should appear after dr', 'OLD_PART')");
+ runSql(connection,
+ "insert into table " + tblName + " partition (pname) values"
+ + "('this data should get overwritten - should NOT appear after dr', 'OVERWRITE_PART')");
+
+ LOGGER.info(tblName + " before bootstrap copying: ");
+ runSql(connection, "select * from " + tblName);
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ runSql(connection,
+ "insert overwrite table " + tblName + " partition (pname) "
+ + "select comment, pname from " + hlpTblName + " where comment REGEXP '^overwrite'");
+ runSql(connection,
+ "insert overwrite table " + tblName + " partition (pname) "
+ + "select comment, pname from " + hlpTblName + " where comment REGEXP '^newdata'");
+
+ LOGGER.info(tblName + " after modifications, before replication: ");
+ runSql(connection, "select * from " + tblName);
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+ cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+ ).assertAll();
+ }
+
+ /**
+ * Run recipe with different frequencies. Submission should go through.
+ * Check frequency of the launched oozie job
+ */
+ @Test(dataProvider = "frequencyGenerator")
+ public void differentRecipeFrequenciesTest(String frequency) throws Exception {
+ LOGGER.info("Testing with frequency: " + frequency);
+ String tblName = "myTable";
+ recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName)
+ .withFrequency(new Frequency(frequency));
+ runSql(connection, "create table " + tblName + "(comment string)");
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+ LOGGER.info("Submission went through.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+ String filter = "name=FALCON_PROCESS_" + recipeMerlin.getName();
+ List<BundleJob> bundleJobs = OozieUtil.getBundles(clusterOC, filter, 0, 10);
+ List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs);
+ String bundleId = OozieUtil.getMaxId(bundleIds);
+ List<CoordinatorJob> coords = clusterOC.getBundleJobInfo(bundleId).getCoordinators();
+ List<String> cIds = new ArrayList<String>();
+ for (CoordinatorJob coord : coords) {
+ cIds.add(coord.getId());
+ }
+ String coordId = OozieUtil.getMinId(cIds);
+ CoordinatorJob job = clusterOC.getCoordJobInfo(coordId);
+ CoordinatorJob.Timeunit timeUnit = job.getTimeUnit();
+ String freq = job.getFrequency();
+ LOGGER.info("Frequency of running job: " + timeUnit + " " + freq);
+ Assert.assertTrue(frequency.contains(timeUnit.name().toLowerCase().replace("_", ""))
+ && frequency.contains(freq), "Running job has different frequency.");
+ }
+
+ @DataProvider(name = "frequencyGenerator")
+ public Object[][] frequencyGenerator() {
+ return new Object[][]{{"minutes(10)"}, {"minutes(10000)"},
+ {"days(3)"}, {"days(3000)"}, {"months(1)"}, {"months(1000)"}, };
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws IOException {
+ try {
+ prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null);
+ } catch (Exception e) {
+ LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e);
+ }
+ removeTestClassEntities();
+ cleanTestsDirs();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
new file mode 100644
index 0000000..a64bd6d
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.cli.FalconCLI;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.RecipeMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.supportClasses.NotifyingAssert;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.Config;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.HiveAssert;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable;
+
+/**
+ * Hive DR Testing for Hive database replication.
+ */
+@Test(groups = "embedded")
+public class HiveDbDRTest extends BaseTestClass {
+ private static final Logger LOGGER = Logger.getLogger(HiveDbDRTest.class);
+ private final ColoHelper cluster = servers.get(0);
+ private final ColoHelper cluster2 = servers.get(1);
+ private final FileSystem clusterFS = serverFS.get(0);
+ private final FileSystem clusterFS2 = serverFS.get(1);
+ private final OozieClient clusterOC = serverOC.get(0);
+ private HCatClient clusterHC;
+ private HCatClient clusterHC2;
+ private RecipeMerlin recipeMerlin;
+ private Connection connection;
+ private Connection connection2;
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ clusterHC = cluster.getClusterHelper().getHCatClient();
+ clusterHC2 = cluster2.getClusterHelper().getHCatClient();
+ bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster);
+ bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2);
+ bundles[0].generateUniqueBundle(this);
+ bundles[1].generateUniqueBundle(this);
+ final ClusterMerlin srcCluster = bundles[0].getClusterElement();
+ final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
+ Bundle.submitCluster(bundles[0]);
+
+ if (MerlinConstants.IS_SECURE) {
+ recipeMerlin = RecipeMerlin.readFromDir("HiveDrSecureRecipe",
+ FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+ .withRecipeCluster(srcCluster);
+ } else {
+ recipeMerlin = RecipeMerlin.readFromDir("HiveDrRecipe",
+ FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+ .withRecipeCluster(srcCluster);
+ }
+ recipeMerlin.withSourceCluster(srcCluster)
+ .withTargetCluster(tgtCluster)
+ .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
+ .withValidity(TimeUtil.getTimeWrtSystemTime(-1), TimeUtil.getTimeWrtSystemTime(11));
+ recipeMerlin.setUniqueName(this.getClass().getSimpleName());
+
+ connection = cluster.getClusterHelper().getHiveJdbcConnection();
+
+ connection2 = cluster2.getClusterHelper().getHiveJdbcConnection();
+ }
+
+ private void setUpDb(String dbName, Connection conn) throws SQLException {
+ runSql(conn, "drop database if exists " + dbName + " cascade");
+ runSql(conn, "create database " + dbName);
+ runSql(conn, "use " + dbName);
+ }
+
+ @Test
+ public void drDbDropDb() throws Exception {
+ final String dbName = "drDbDropDb";
+ setUpDb(dbName, connection);
+ setUpDb(dbName, connection2);
+ recipeMerlin.withSourceDb(dbName).withSourceTable("*");
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ runSql(connection, "drop database " + dbName);
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ final List<String> dstDbs = runSql(connection2, "show databases");
+ Assert.assertFalse(dstDbs.contains(dbName), "dstDbs = " + dstDbs + " was not expected to "
+ + "contain " + dbName);
+ }
+
+
+ @Test(dataProvider = "isDBReplication")
+ public void drDbFailPass(Boolean isDBReplication) throws Exception {
+ final String dbName = "drDbFailPass";
+ final String tblName = "vanillaTable";
+ final String hiveWarehouseLocation = Config.getProperty("hive.warehouse.location", "/apps/hive/warehouse/");
+ final String dbPath = HadoopUtil.joinPath(hiveWarehouseLocation, dbName.toLowerCase() + ".db");
+ setUpDb(dbName, connection);
+ runSql(connection, "create table " + tblName + "(data string)");
+ setUpDb(dbName, connection2);
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+ recipeMerlin.withSourceDb(dbName).withSourceTable(isDBReplication ? "*" : tblName);
+
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ runSql(connection, "insert into table " + tblName + " values('cannot be replicated now')");
+ final String noReadWritePerm = "d---r-xr-x";
+ LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + noReadWritePerm);
+ clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(noReadWritePerm));
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.KILLED, EntityType.PROCESS);
+
+ final String readWritePerm = "drwxr-xr-x";
+ LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + readWritePerm);
+ clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(readWritePerm));
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName),
+ cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true)
+ ).assertAll();
+ }
+
+ @Test
+ public void drDbAddDropTable() throws Exception {
+ final String dbName = "drDbAddDropTable";
+ final String tblToBeDropped = "table_to_be_dropped";
+ final String tblToBeDroppedAndAdded = "table_to_be_dropped_and_readded";
+ final String newTableToBeAdded = "new_table_to_be_added";
+
+ setUpDb(dbName, connection);
+ setUpDb(dbName, connection2);
+ recipeMerlin.withSourceDb(dbName).withSourceTable("*")
+ .withFrequency(new Frequency("2", Frequency.TimeUnit.minutes));
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ createVanillaTable(connection, tblToBeDropped);
+ createVanillaTable(connection, tblToBeDroppedAndAdded);
+ bootstrapCopy(connection, clusterFS, tblToBeDropped,
+ connection2, clusterFS2, tblToBeDropped);
+ bootstrapCopy(connection, clusterFS, tblToBeDroppedAndAdded,
+ connection2, clusterFS2, tblToBeDroppedAndAdded);
+
+ /* For first replication - two tables are dropped & one table is added */
+ runSql(connection, "drop table " + tblToBeDropped);
+ runSql(connection, "drop table " + tblToBeDroppedAndAdded);
+ createVanillaTable(connection, newTableToBeAdded);
+
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ final NotifyingAssert anAssert = new NotifyingAssert(true);
+ HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName),
+ cluster2, clusterHC2.getDatabase(dbName), anAssert);
+
+ /* For second replication - a dropped tables is added back */
+ createVanillaTable(connection, tblToBeDroppedAndAdded);
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 2,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName),
+ cluster2, clusterHC2.getDatabase(dbName), anAssert);
+ anAssert.assertAll();
+ }
+
+ @Test(enabled = false)
+ public void drDbNonReplicatableTable() throws Exception {
+ final String dbName = "drDbNonReplicatableTable";
+ final String tblName = "vanillaTable";
+ final String tblView = "vanillaTableView";
+ final String tblOffline = "offlineTable";
+
+ setUpDb(dbName, connection);
+ setUpDb(dbName, connection2);
+ recipeMerlin.withSourceDb(dbName).withSourceTable("*")
+ .withFrequency(new Frequency("2", Frequency.TimeUnit.minutes));
+ final List<String> command = recipeMerlin.getSubmissionCommand();
+
+ createVanillaTable(connection, tblName);
+ runSql(connection, "create view " + tblView + " as select * from " + tblName);
+ createVanillaTable(connection, tblOffline);
+ bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+ bootstrapCopy(connection, clusterFS, tblOffline, connection2, clusterFS2, tblOffline);
+ final String newComment = "'new comment for offline table should not reach destination'";
+ runSql(connection,
+ "alter table " + tblOffline + " set tblproperties ('comment' =" + newComment +")");
+ runSql(connection, "alter table " + tblOffline + " enable offline");
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ //vanilla table gets replicated, offline table & view are not replicated
+ HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName),
+ cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true)).assertAll();
+ final List<String> dstTables = runSql(connection2, "show tables");
+ Assert.assertFalse(dstTables.contains(tblView),
+ "dstTables = " + dstTables + " was not expected to contain " + tblView);
+ final List<String> dstComment =
+ runSql(connection2, "show tblproperties " + tblOffline + "('comment')");
+ Assert.assertFalse(dstComment.contains(newComment),
+ tblOffline + " comment = " + dstComment + " was not expected to contain " + newComment);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws IOException {
+ try {
+ prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null);
+ } catch (Exception e) {
+ LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e);
+ }
+ removeTestClassEntities();
+ cleanTestsDirs();
+ }
+
+ @DataProvider
+ public Object[][] isDBReplication() {
+ return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java
new file mode 100644
index 0000000..9eb389a
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.apache.falcon.regression.core.util.HadoopUtil.writeDataForHive;
+import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
+
+/**
+ * Create Hive tables for testing Hive DR. Note that this is not expected to be used out of
+ * HiveDR tests.
+ */
+final class HiveObjectCreator {
+ private static final Logger LOGGER = Logger.getLogger(HiveObjectCreator.class);
+ private static final String HDFS_TMP_DIR = "/tmp/hive_objects/";
+
+ private HiveObjectCreator() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+
+ static void bootstrapCopy(Connection srcConnection, FileSystem srcFs, String srcTable,
+ Connection dstConnection, FileSystem dstFs, String dstTable) throws Exception {
+ LOGGER.info("Starting bootstrap...");
+ final String dumpPath = HDFS_TMP_DIR + srcTable + "/";
+ HadoopUtil.recreateDir(srcFs, dumpPath);
+ runSqlQuietly(srcConnection, "dfs -chmod -R 777 " + dumpPath);
+ HadoopUtil.deleteDirIfExists(dumpPath, dstFs);
+ runSql(srcConnection, "export table " + srcTable + " to '" + dumpPath + "' FOR REPLICATION('ignore')");
+ FileUtil.copy(srcFs, new Path(dumpPath), dstFs, new Path(dumpPath), false, true, new Configuration());
+ runSqlQuietly(dstConnection, "dfs -chmod -R 777 " + dumpPath);
+ runSql(dstConnection, "import table " + dstTable + " from '" + dumpPath + "'");
+ HadoopUtil.deleteDirIfExists(dumpPath, srcFs);
+ HadoopUtil.deleteDirIfExists(dumpPath, dstFs);
+ LOGGER.info("Finished bootstrap");
+ }
+
+ /* We need to delete it using hive query as the created directory is owned by hive.*/
+ private static void runSqlQuietly(Connection srcConnection, String sql) {
+ try {
+ runSql(srcConnection, sql);
+ } catch (SQLException ex) {
+ LOGGER.info("Exception while hive ql execution: " + ex.getMessage());
+ }
+ }
+
+ /**
+ * Create an external table.
+ * @param connection jdbc connection object to use for issuing queries to hive
+ * @param fs filesystem object to upload the data
+ * @param clickDataLocation location to upload the data to
+ * @throws IOException
+ * @throws SQLException
+ */
+ static void createExternalTable(Connection connection, FileSystem fs, String
+ clickDataLocation, String tableName) throws IOException, SQLException {
+ HadoopUtil.deleteDirIfExists(clickDataLocation, fs);
+ fs.mkdirs(new Path(clickDataLocation));
+ fs.setPermission(new Path(clickDataLocation), FsPermission.getDirDefault());
+ writeDataForHive(fs, clickDataLocation,
+ new StringBuffer("click1").append((char) 0x01).append("01:01:01").append("\n")
+ .append("click2").append((char) 0x01).append("02:02:02"), true);
+ //clusterFS.setPermission(new Path(clickDataPart2), FsPermission.getFileDefault());
+ runSql(connection, "create external table " + tableName
+ + " (data string, time string) "
+ + "location '" + clickDataLocation + "'");
+ runSql(connection, "select * from " + tableName);
+ }
+
+
+ /**
+ * Create an external table.
+ * @param connection jdbc connection object to use for issuing queries to hive
+ * @param fs filesystem object to upload the data
+ * @param clickDataLocation location to upload the data to
+ * @throws IOException
+ * @throws SQLException
+ */
+ static void createExternalPartitionedTable(Connection connection, FileSystem fs, String
+ clickDataLocation, String tableName) throws IOException, SQLException {
+ final String clickDataPart1 = clickDataLocation + "2001-01-01/";
+ final String clickDataPart2 = clickDataLocation + "2001-01-02/";
+ fs.mkdirs(new Path(clickDataLocation));
+ fs.setPermission(new Path(clickDataLocation), FsPermission.getDirDefault());
+ writeDataForHive(fs, clickDataPart1,
+ new StringBuffer("click1").append((char) 0x01).append("01:01:01"), true);
+ writeDataForHive(fs, clickDataPart2,
+ new StringBuffer("click2").append((char) 0x01).append("02:02:02"), true);
+ //clusterFS.setPermission(new Path(clickDataPart2), FsPermission.getFileDefault());
+ runSql(connection, "create external table " + tableName
+ + " (data string, time string) partitioned by (date_ string) "
+ + "location '" + clickDataLocation + "'");
+ runSql(connection, "alter table " + tableName + " add partition "
+ + "(date_='2001-01-01') location '" + clickDataPart1 + "'");
+ runSql(connection, "alter table " + tableName + " add partition "
+ + "(date_='2001-01-02') location '" + clickDataPart2 + "'");
+ runSql(connection, "select * from " + tableName);
+ }
+
+ /**
+ * Create an partitioned table.
+ * @param connection jdbc connection object to use for issuing queries to hive
+ * @throws SQLException
+ */
+ static void createPartitionedTable(Connection connection) throws SQLException {
+ runSql(connection, "create table global_store_sales "
+ + "(customer_id string, item_id string, quantity float, price float, time timestamp) "
+ + "partitioned by (country string)");
+ runSql(connection,
+ "insert into table global_store_sales partition (country = 'us') values"
+ + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01')");
+ runSql(connection,
+ "insert into table global_store_sales partition (country = 'uk') values"
+ + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')");
+ runSql(connection, "select * from global_store_sales");
+ }
+
+ /**
+ * Create an plain old table.
+ * @param connection jdbc connection object to use for issuing queries to hive
+ * @param tblName
+ * @throws SQLException
+ */
+ static void createVanillaTable(Connection connection, String tblName) throws SQLException {
+ //vanilla table
+ runSql(connection, "create table " + tblName
+ + "(customer_id string, item_id string, quantity float, price float, time timestamp)");
+ runSql(connection, "insert into table " + tblName + " values "
+ + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01'), "
+ + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')");
+ runSql(connection, "select * from " + tblName);
+ }
+
+ /**
+ * Create a partitioned table with either dynamic or static partitions.
+ * @param connection jdbc connection object to use for issuing queries to hive
+ * @param dynamic should partitions be added in dynamic or static way
+ * @throws SQLException
+ */
+ static void createPartitionedTable(Connection connection,
+ boolean dynamic) throws SQLException {
+ String [][] partitions = {
+ {"us", "Kansas", },
+ {"us", "California", },
+ {"au", "Queensland", },
+ {"au", "Victoria", },
+ };
+ //create table
+ runSql(connection, "drop table global_store_sales");
+ runSql(connection, "create table global_store_sales(customer_id string,"
+ + " item_id string, quantity float, price float, time timestamp) "
+ + "partitioned by (country string, state string)");
+ //provide data
+ String query;
+ if (dynamic) {
+ //disable strict mode, thus both partitions can be used as dynamic
+ runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
+ query = "insert into table global_store_sales partition"
+ + "(country, state) values('c%3$s', 'i%3$s', '%3$s', '%3$s', "
+ + "'2001-01-01 01:01:0%3$s', '%1$s', '%2$s')";
+ } else {
+ query = "insert into table global_store_sales partition"
+ + "(country = '%1$s', state = '%2$s') values('c%3$s', 'i%3$s', '%3$s', '%3$s', "
+ + "'2001-01-01 01:01:0%3$s')";
+ }
+ for (int i = 0; i < partitions.length; i++) {
+ runSql(connection, String.format(query, partitions[i][0], partitions[i][1], i + 1));
+ }
+ runSql(connection, "select * from global_store_sales");
+ }
+
+ static void createSerDeTable(Connection connection) throws SQLException {
+ runSql(connection, "create table store_json "
+ + "(customer_id string, item_id string, quantity float, price float, time timestamp) "
+ + "row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' ");
+ runSql(connection, "insert into table store_json values "
+ + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01'), "
+ + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')");
+ runSql(connection, "select * from store_json");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
index 8ef6bb6..7ad4c8e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
@@ -88,7 +88,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
}
/*
- * Prepares running feed with instances ordered (desc): 1 waiting, 1 suspended, 1 running,
+ * Prepares running feed with instances ordered (desc): 1 waiting, 1 running, 1 suspended,
* 3 waiting and 6 killed. Testing is based on expected instances statuses.
*/
private void prepareScenario() throws AuthenticationException, IOException, URISyntaxException,
@@ -284,9 +284,9 @@ public class ListFeedInstancesTest extends BaseTestClass {
"start=" + TimeUtil.addMinsToTime(endTime, -5) + "&end=" + endTime, null);
InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
- //only start, actual feed startTime, should get 10 most recent instances(by default).
+ //only start, actual feed startTime, should get 1-10 instances(end is automatically set to freq*10).
r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime, null);
- InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
+ InstanceUtil.validateResponse(r, 10, 0, 1, 3, 6);
//only start, greater then the actual startTime.
r = prism.getFeedHelper().listInstances(feedName,
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
index f35e12d..be8a631 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
@@ -19,7 +19,6 @@
package org.apache.falcon.regression.lineage;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.BundleUtil;
@@ -75,15 +74,12 @@ public class ListProcessInstancesTest extends BaseTestClass {
bundles[0].setInputFeedDataPath(feedDataLocation);
bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output" + MINUTE_DATE_PATTERN);
bundles[0].setProcessValidity(startTime, endTime);
- bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
bundles[0].setProcessConcurrency(3);
bundles[0].submitAndScheduleProcess();
processName = bundles[0].getProcessName();
InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
//create data for processes to run and wait some time for instances to make progress
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2);
+ OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3);
}
@@ -94,8 +90,7 @@ public class ListProcessInstancesTest extends BaseTestClass {
}
/**
- * List process instances using orderBy - status, -startTime, -endTime params, expecting list of
- * process instances in the right order.
+ * List process instances using orderBy - status, -startTime, -endTime params.
*/
@Test
public void testProcessOrderBy() throws Exception {
@@ -172,12 +167,12 @@ public class ListProcessInstancesTest extends BaseTestClass {
//use start option without numResults. 10 instances expected
r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null);
- InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0);
+ InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0);
//use start option with numResults value which is smaller then default.
r = prism.getProcessHelper().listInstances(processName,
"start=" + startTime + "&numResults=8", null);
- InstanceUtil.validateResponse(r, 8, 3, 0, 5, 0);
+ InstanceUtil.validateResponse(r, 8, 0, 0, 8, 0);
//use start option with numResults value greater then default. All 12 instances expected
r = prism.getProcessHelper().listInstances(processName,
@@ -242,8 +237,6 @@ public class ListProcessInstancesTest extends BaseTestClass {
InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
//wait till new instances be RUNNING and total status count be stable
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 3);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 4);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3);
@@ -300,7 +293,7 @@ public class ListProcessInstancesTest extends BaseTestClass {
+ "&end=" + TimeUtil.addMinsToTime(startTime, 16), null);
InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
- //only start, actual startTime, should get 10 most recent instances
+ //only start, actual startTime (end is automatically set to start + frequency * 10)
r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null);
InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index 4946b30..10ab192 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -87,7 +87,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
@BeforeMethod(alwaysRun = true)
public void testSetup() throws Exception {
- Bundle b = BundleUtil.readUpdateBundle();
+ final Bundle b = BundleUtil.readUpdateBundle();
bundles[0] = new Bundle(b, cluster1);
bundles[0].generateUniqueBundle(this);
bundles[1] = new Bundle(b, cluster2);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
index b7e6861..e8b7ed8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
@@ -68,7 +68,7 @@ public class PrismFeedDeleteTest extends BaseTestClass {
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
restartRequired = false;
- Bundle bundle = BundleUtil.readELBundle();
+ final Bundle bundle = BundleUtil.readELBundle();
bundles[0] = new Bundle(bundle, cluster1);
bundles[0].generateUniqueBundle(this);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
@@ -361,8 +361,10 @@ public class PrismFeedDeleteTest extends BaseTestClass {
@Test(groups = {"multiCluster"})
public void testServer1FeedDeleteNonExistentWhen1ColoIsDownDuringDelete() throws Exception {
restartRequired = true;
- bundles[0] = new Bundle(bundles[0], cluster1);
- bundles[1] = new Bundle(bundles[1], cluster2);
+ bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1);
+ bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
+ bundles[0].generateUniqueBundle(this);
+ bundles[1].generateUniqueBundle(this);
bundles[0].setCLusterColo(cluster1Colo);
LOGGER.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
index f90a76b..23878df 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
@@ -59,8 +59,7 @@ public class PrismSubmitTest extends BaseTestClass {
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
restartRequired = false;
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster1);
+ bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1);
bundles[0].generateUniqueBundle(this);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
}
@@ -308,7 +307,7 @@ public class PrismSubmitTest extends BaseTestClass {
@Test(groups = {"prism", "0.2", "distributed"})
public void submitClusterReSubmitAlreadyPartial() throws Exception {
restartRequired = true;
- bundles[1] = new Bundle(bundles[0], cluster2);
+ bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
bundles[1].generateUniqueBundle(this);
bundles[1].setProcessWorkflow(aggregateWorkflowDir);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index e10f8d1..afa01c1 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -83,7 +83,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
@BeforeMethod(alwaysRun = true)
public void setup() throws IOException {
- Bundle bundle = BundleUtil.readFeedReplicationBundle();
+ final Bundle bundle = BundleUtil.readFeedReplicationBundle();
bundles[0] = new Bundle(bundle, cluster1);
bundles[1] = new Bundle(bundle, cluster2);
bundles[2] = new Bundle(bundle, cluster3);
@@ -92,8 +92,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
bundles[1].generateUniqueBundle(this);
bundles[2].generateUniqueBundle(this);
- processBundle = BundleUtil.readELBundle();
- processBundle = new Bundle(processBundle, cluster1);
+ processBundle = new Bundle(BundleUtil.readELBundle(), cluster1);
processBundle.generateUniqueBundle(this);
processBundle.setProcessWorkflow(aggregateWorkflowDir);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
index 63da183..c53e06b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
@@ -158,6 +158,9 @@ public class EntitiesTableTest extends BaseUITestClass {
@DataProvider
public Object[][] getBoolean() {
- return new Boolean[][]{{Boolean.TRUE}, {Boolean.FALSE}};
+ return new Boolean[][]{
+ {Boolean.TRUE},
+ {Boolean.FALSE},
+ };
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
index 4c1e379..4ad775e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
@@ -588,8 +588,8 @@ public class EntityPageTest extends BaseUITestClass {
.getProcessInstanceLogs(process.getName(),
"start=" + nominalTimeOfSelectedInstance
+ "&end=" + TimeUtil.addMinsToTime(nominalTimeOfSelectedInstance, 1));
- Assert.assertEquals(getDriver().getCurrentUrl().replaceFirst("/\\?", "?"),
- processInstanceLogs.getInstances()[0].getLogFile(),
+ Assert.assertEquals(getDriver().getCurrentUrl().replaceFirst("/\\?", "?").toLowerCase(),
+ processInstanceLogs.getInstances()[0].getLogFile().toLowerCase(),
"Only one instance is selected. "
+ "Clicking instance log button should take user to oozie page.");
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
index d8aed28..47b1d19 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
@@ -139,6 +139,7 @@ public class FeedSetupTest extends BaseUITestClass{
@Test
public void testWizardCancel() throws Exception {
// Step 1 - Check cancel on the first page - General Info Page
+ feedWizardPage.setFeedGeneralInfo(feed);
feedWizardPage.clickCancel();
searchPage.checkPage();
@@ -146,6 +147,7 @@ public class FeedSetupTest extends BaseUITestClass{
feedWizardPage = searchPage.getPageHeader().doCreateFeed();
feedWizardPage.setFeedGeneralInfo(feed);
feedWizardPage.clickNext();
+ feedWizardPage.setFeedPropertiesInfo(feed);
feedWizardPage.clickCancel();
searchPage.checkPage();
@@ -155,6 +157,7 @@ public class FeedSetupTest extends BaseUITestClass{
feedWizardPage.clickNext();
feedWizardPage.setFeedPropertiesInfo(feed);
feedWizardPage.clickNext();
+ feedWizardPage.setFeedLocationInfo(feed);
feedWizardPage.clickCancel();
searchPage.checkPage();
@@ -166,6 +169,7 @@ public class FeedSetupTest extends BaseUITestClass{
feedWizardPage.clickNext();
feedWizardPage.setFeedLocationInfo(feed);
feedWizardPage.clickNext();
+ feedWizardPage.setFeedClustersInfo(feed);
feedWizardPage.clickCancel();
searchPage.checkPage();
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
index f71739d..20864f6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
@@ -111,18 +111,26 @@ public class HomePageTest extends BaseUITestClass {
final String clusterXml = bundle.getClusterElement().toString();
homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(clusterXml));
+ String alert = homePage.getActiveAlertText();
+ Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
AssertUtil.assertSucceeded(prism.getClusterHelper().getEntityDefinition(clusterXml));
final String feedXml = bundle.getInputFeedFromBundle();
homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(feedXml));
+ alert = homePage.getActiveAlertText();
+ Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
AssertUtil.assertSucceeded(prism.getFeedHelper().getEntityDefinition(feedXml));
final String outputFeedXml = bundle.getOutputFeedFromBundle();
homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(outputFeedXml));
+ alert = homePage.getActiveAlertText();
+ Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
AssertUtil.assertSucceeded(prism.getFeedHelper().getEntityDefinition(outputFeedXml));
final String processXml = bundle.getProcessObject().toString();
homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(processXml));
+ alert = homePage.getActiveAlertText();
+ Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
AssertUtil.assertSucceeded(prism.getProcessHelper().getEntityDefinition(processXml));
}
@@ -161,7 +169,6 @@ public class HomePageTest extends BaseUITestClass {
writer.close();
homePage.getPageHeader().uploadXml(xmlFile.getAbsolutePath());
- Thread.sleep(1000);
alertText = homePage.getActiveAlertText();
Assert.assertEquals(alertText, "Invalid xml. File not uploaded",
"XML file with invalid text was allowed to be uploaded");
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
index 9ec936d..8598b18 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
@@ -22,7 +22,11 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.testHelper.BaseUITestClass;
import org.apache.falcon.regression.ui.search.EntityPage;
import org.apache.falcon.regression.ui.search.InstancePage;
@@ -33,9 +37,7 @@ import org.apache.falcon.resource.InstancesResult;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -59,14 +61,10 @@ public class InstancePageTest extends BaseUITestClass {
private String instance = "2010-01-02T01:00Z";
private String processName;
- @BeforeClass(alwaysRun = true)
- public void setup() {
- openBrowser();
- searchPage = LoginPage.open(getDriver()).doDefaultLogin();
- }
-
@BeforeMethod(alwaysRun = true)
public void submitEntities() throws Exception {
+ openBrowser();
+ searchPage = LoginPage.open(getDriver()).doDefaultLogin();
cleanAndGetTestDir();
HadoopUtil.uploadDir(serverFS.get(0), aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
bundles[0] = BundleUtil.readELBundle();
@@ -184,11 +182,7 @@ public class InstancePageTest extends BaseUITestClass {
@AfterMethod(alwaysRun = true)
public void tearDown() throws IOException {
- removeTestClassEntities();
- }
-
- @AfterClass(alwaysRun = true)
- public void tearDownClass() {
closeBrowser();
+ removeTestClassEntities();
}
}