You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2015/07/29 19:09:42 UTC

[3/5] falcon git commit: FALCON-1319: Contribute HiveDr, Mirror tests and some test fixes contributed by Namit Maheshwari, Paul Isaychuk, Raghav Kumar Gautam & Ruslan Ostafiychuk

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
new file mode 100644
index 0000000..1c788a3
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
@@ -0,0 +1,700 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.cli.FalconCLI;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.RecipeMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.supportClasses.NotifyingAssert;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HiveAssert;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import org.testng.asserts.SoftAssert;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalPartitionedTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createPartitionedTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createSerDeTable;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable;
+
+/**
+ * Hive DR Testing.
+ */
+@Test(groups = "embedded")
+public class HiveDRTest extends BaseTestClass {
+    private static final Logger LOGGER = Logger.getLogger(HiveDRTest.class);
+    private static final String DB_NAME = "hdr_sdb1";
+    private final ColoHelper cluster = servers.get(0);
+    private final ColoHelper cluster2 = servers.get(1);
+    private final ColoHelper cluster3 = servers.get(2);
+    private final FileSystem clusterFS = serverFS.get(0);
+    private final FileSystem clusterFS2 = serverFS.get(1);
+    private final FileSystem clusterFS3 = serverFS.get(2);
+    private final OozieClient clusterOC = serverOC.get(0);
+    private final OozieClient clusterOC2 = serverOC.get(1);
+    private final String baseTestHDFSDir = cleanAndGetTestDir() + "/HiveDR/";
+    private HCatClient clusterHC;
+    private HCatClient clusterHC2;
+    private RecipeMerlin recipeMerlin;
+    private Connection connection;
+    private Connection connection2;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        clusterHC = cluster.getClusterHelper().getHCatClient();
+        clusterHC2 = cluster2.getClusterHelper().getHCatClient();
+        bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster);
+        bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2);
+        bundles[0].generateUniqueBundle(this);
+        bundles[1].generateUniqueBundle(this);
+        final ClusterMerlin srcCluster = bundles[0].getClusterElement();
+        final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
+        Bundle.submitCluster(bundles[0]);
+
+        if (MerlinConstants.IS_SECURE) {
+            recipeMerlin = RecipeMerlin.readFromDir("HiveDrSecureRecipe",
+                FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+                .withRecipeCluster(srcCluster);
+        } else {
+            recipeMerlin = RecipeMerlin.readFromDir("HiveDrRecipe",
+                FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+                .withRecipeCluster(srcCluster);
+        }
+        recipeMerlin.withSourceCluster(srcCluster)
+            .withTargetCluster(tgtCluster)
+            .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
+            .withValidity(TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(5));
+        recipeMerlin.setUniqueName(this.getClass().getSimpleName());
+
+        connection = cluster.getClusterHelper().getHiveJdbcConnection();
+        runSql(connection, "drop database if exists hdr_sdb1 cascade");
+        runSql(connection, "create database hdr_sdb1");
+        runSql(connection, "use hdr_sdb1");
+
+        connection2 = cluster2.getClusterHelper().getHiveJdbcConnection();
+        runSql(connection2, "drop database if exists hdr_sdb1 cascade");
+        runSql(connection2, "create database hdr_sdb1");
+        runSql(connection2, "use hdr_sdb1");
+    }
+
+    @Test
+    public void drPartition() throws Exception {
+        final String tblName = "partitionDR";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        runSql(connection,
+            "create table " + tblName + "(comment string) partitioned by (pname string)");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'DELETE') values"
+                + "('this partition is going to be deleted - should NOT appear after dr')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'REPLACE') values"
+                + "('this partition is going to be replaced - should NOT appear after dr')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'ADD_DATA') values"
+                + "('this partition will have more data - should appear after dr')");
+
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'NEW_PART') values"
+                + "('this partition has been added post bootstrap - should appear after dr')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'ADD_DATA') values"
+                + "('more data has been added post bootstrap - should appear after dr')");
+        runSql(connection,
+            "alter table " + tblName + " drop partition(pname = 'DELETE')");
+        runSql(connection,
+            "alter table " + tblName + " drop partition(pname = 'REPLACE')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'REPLACE') values"
+                + "('this partition has been replaced - should appear after dr')");
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+        ).assertAll();
+    }
+
+    @Test
+    public void drInsertOverwritePartition() throws Exception {
+        final String tblName = "drInsertOverwritePartition";
+        final String hlpTblName = "drInsertOverwritePartitionHelperTbl";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+        runSql(connection, "create table " + hlpTblName + "(comment string)");
+        runSql(connection,
+            "insert into table " + hlpTblName
+                + " values('overwrite data - should appear after dr')");
+        runSql(connection,
+            "insert into table " + hlpTblName + " values('newdata row2 - should appear after dr')");
+        runSql(connection,
+            "insert into table " + hlpTblName + " values('newdata row1 - should appear after dr')");
+
+        runSql(connection,
+            "create table " + tblName + "(comment string) partitioned by (pname string)");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'OLD_PART') values"
+                + "('this data should be retained - should appear after dr')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname = 'OVERWRITE_PART') values"
+                + "('this data should get overwritten - should NOT appear after dr')");
+
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        runSql(connection,
+            "insert overwrite table " + tblName + " partition (pname = 'OVERWRITE_PART') "
+                + "select * from " + hlpTblName + " where comment REGEXP '^overwrite'");
+        runSql(connection,
+            "insert overwrite table " + tblName + " partition (pname = 'NEW_DATA') "
+                + "select * from " + hlpTblName + " where comment REGEXP '^newdata'");
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+        ).assertAll();
+    }
+
+    @Test
+    public void drTwoTablesOneRequest() throws Exception {
+        final String tblName = "firstTableDR";
+        final String tbl2Name = "secondTableDR";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName + ',' + tbl2Name);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        runSql(connection,
+            "create table " + tblName + "(comment string)");
+        runSql(connection,
+            "create table " + tbl2Name + "(comment string)");
+
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+        bootstrapCopy(connection, clusterFS, tbl2Name, connection2, clusterFS2, tbl2Name);
+
+        runSql(connection,
+            "insert into table " + tblName + " values"
+                + "('this string has been added post bootstrap - should appear after dr')");
+        runSql(connection,
+            "insert into table " + tbl2Name + " values"
+                + "('this string has been added post bootstrap - should appear after dr')");
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        final NotifyingAssert anAssert = new NotifyingAssert(true);
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert);
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tbl2Name),
+            cluster2, clusterHC2.getTable(DB_NAME, tbl2Name), anAssert);
+        anAssert.assertAll();
+
+    }
+
+    @Test
+    public void drSerDeWithProperties() throws Exception {
+        final String tblName = "serdeTable";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        runSql(connection,
+            "create table " + tblName + "(comment string) "
+                + "row format serde 'org.apache.hive.hcatalog.data.JsonSerDe'");
+
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        runSql(connection,
+            "insert into table " + tblName + " values"
+                + "('this string has been added post bootstrap - should appear after dr')");
+
+        runSql(connection,
+            "ALTER TABLE " + tblName + " SET SERDEPROPERTIES ('someProperty' = 'value')");
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+        ).assertAll();
+
+    }
+
+    @Test
+    public void drChangeColumn() throws Exception {
+        final String tblName = "tableForColumnChange";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command1 = recipeMerlin.getSubmissionCommand();
+        final String recipe1Name = recipeMerlin.getName();
+        runSql(connection,
+            "create table " + tblName + "(id int)");
+
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed.");
+        runSql(connection,
+            "ALTER TABLE " + tblName + " CHANGE id id STRING COMMENT 'some_comment'");
+
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipe1Name, 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+        ).assertAll();
+    }
+
+
+    @Test
+    public void drTwoDstTablesTwoRequests() throws Exception {
+        final HCatClient clusterHC3 = cluster3.getClusterHelper().getHCatClient();
+        final Connection connection3 = cluster3.getClusterHelper().getHiveJdbcConnection();
+        runSql(connection3, "drop database if exists hdr_sdb1 cascade");
+        runSql(connection3, "create database hdr_sdb1");
+        runSql(connection3, "use hdr_sdb1");
+
+        final String tblName = "vanillaTable";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final String recipe1Name = recipeMerlin.getName();
+        final List<String> command1 = recipeMerlin.getSubmissionCommand();
+
+        final Bundle bundle = BundleUtil.readHCatBundle();
+        bundle.generateUniqueBundle(this);
+        recipeMerlin.withTargetCluster(new Bundle(bundle, cluster3).getClusterElement());
+        recipeMerlin.setUniqueName(this.getClass().getSimpleName());
+
+        final List<String> command2 = recipeMerlin.getSubmissionCommand();
+        final String recipe2Name = recipeMerlin.getName();
+
+        runSql(connection, "create table " + tblName + "(comment string)");
+
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+        bootstrapCopy(connection, clusterFS, tblName, connection3, clusterFS3, tblName);
+
+        runSql(connection,
+            "insert into table " + tblName + " values"
+                + "('this string has been added post bootstrap - should appear after dr')");
+
+        Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed.");
+        Assert.assertEquals(Bundle.runFalconCLI(command2), 0, "Recipe submission failed.");
+
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipe1Name, 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipe2Name, 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        final NotifyingAssert anAssert = new NotifyingAssert(true);
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert);
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster3, clusterHC3.getTable(DB_NAME, tblName), anAssert);
+        anAssert.assertAll();
+    }
+
+    @Test
+    public void drExternalToNonExternal() throws Exception {
+        final String tblName = "externalToNonExternal";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        createExternalTable(connection, clusterFS, baseTestHDFSDir + "click_data/", tblName);
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        //change column name
+        runSql(connection,
+            "alter table " + tblName + " change column data data_new string");
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        final NotifyingAssert anAssert = new NotifyingAssert(true);
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
+        anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(),
+            clusterHC.getTable(DB_NAME, tblName).getTableName(),
+            "Source and destination tables should have different Tabletype");
+        anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+            clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+            "Source and destination tables should have different value of property EXTERNAL");
+        anAssert.assertAll();
+    }
+
+    @Test
+    public void drExtPartitionedToNonExtPartitioned() throws Exception {
+        final String tblName = "extPartitionedToNonExtPartitioned";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        createExternalPartitionedTable(connection, clusterFS,
+            baseTestHDFSDir + "click_data/", tblName);
+        runSql(connection2,
+            "create table " + tblName + " (data string, time string) partitioned by (date_ string)");
+        runSql(connection2, "alter table " + tblName + " add partition "
+            + "(date_='2001-01-01') location '" + baseTestHDFSDir + "click_data/2001-01-01/'");
+        runSql(connection2, "alter table " + tblName + " add partition "
+            + "(date_='2001-01-02') location '" + baseTestHDFSDir + "click_data/2001-01-02/'");
+
+        runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-01') "
+            + "values ('click1', '01:01:01')");
+        runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-02') "
+            + "values ('click2', '02:02:02')");
+
+        final NotifyingAssert anAssert = new NotifyingAssert(true);
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
+
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        //change column name
+        runSql(connection,
+            "alter table " + tblName + " change column data data_new string");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false);
+        anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(),
+            clusterHC.getTable(DB_NAME, tblName).getTableName(),
+            "Source and destination tables should have different Tabletype");
+        anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+            clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"),
+            "Source and destination tables should have different value of property EXTERNAL");
+        anAssert.assertAll();
+    }
+
+    /**
+     * 1 src tbl 1 dst tbl. Change table properties and comment at the source.
+     * Changes should get reflected at destination.
+     */
+    @Test
+    public void drChangeCommentAndPropertyTest() throws Exception {
+        final String tblName = "myTable";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        runSql(connection, "create table " + tblName + "(field string)");
+        //add new table property
+        runSql(connection,
+            "ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'initialValue')");
+        //set comment
+        runSql(connection,
+            "ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment will be "
+                + "changed, SHOULD NOT appear')");
+
+        LOGGER.info(tblName + " before bootstrap copy: ");
+        runSql(connection, "describe extended " + tblName);
+
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        //change table property and comment
+        runSql(connection,
+            "ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'anotherValue')");
+        runSql(connection,
+            "ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment should "
+                + "appear after replication done')");
+
+        LOGGER.info(tblName + " after modifications, before replication: ");
+        runSql(connection, "describe extended " + tblName);
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+        ).assertAll();
+    }
+
+    @Test
+    public void dataGeneration() throws Exception {
+        runSql(connection, "use hdr_sdb1");
+        createVanillaTable(connection, "store_sales");
+        createSerDeTable(connection);
+        createPartitionedTable(connection);
+        createExternalTable(connection, clusterFS,
+            baseTestHDFSDir + "click_data/", "click_data");
+        createExternalPartitionedTable(connection, clusterFS,
+            baseTestHDFSDir + "click_data2/", "click_data2");
+
+        runSql(connection2, "use hdr_sdb1");
+        createVanillaTable(connection2, "store_sales");
+        createSerDeTable(connection2);
+        createPartitionedTable(connection2);
+        createExternalTable(connection2, clusterFS2,
+            baseTestHDFSDir + "click_data/", "click_data");
+        createExternalPartitionedTable(connection2, clusterFS2,
+            baseTestHDFSDir + "click_data2/", "click_data2");
+
+        final NotifyingAssert anAssert = new NotifyingAssert(true);
+        HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("hdr_sdb1"),
+            cluster2, clusterHC2.getDatabase("hdr_sdb1"), anAssert);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable("hdr_sdb1", "click_data"),
+            cluster2, clusterHC2.getTable("hdr_sdb1", "click_data"), anAssert);
+        anAssert.assertAll();
+
+    }
+
+    @Test(enabled = false)
+    public void assertionTest() throws Exception {
+        final SoftAssert anAssert = new SoftAssert();
+        HiveAssert.assertTableEqual(
+            cluster, clusterHC.getTable("default", "hcatsmoke10546"),
+            cluster2, clusterHC2.getTable("default", "hcatsmoke10548"), anAssert);
+        HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("default"), cluster2,
+            clusterHC2.getDatabase("default"), anAssert);
+        anAssert.assertAll();
+    }
+
+    /**
+     * Test creates a table on first cluster using static partitioning. Then it creates the same
+     * table on the second cluster using dynamic partitioning. Finally it checks the equality of
+     * these tables.
+     * @throws SQLException
+     * @throws IOException
+     */
+    @Test
+    public void dynamicPartitionsTest() throws SQLException, IOException {
+        //create table with static partitions on first cluster
+        createPartitionedTable(connection, false);
+
+        //create table with dynamic partitions on second cluster
+        createPartitionedTable(connection2, true);
+
+        //check that both tables are equal
+        HiveAssert.assertTableEqual(
+            cluster, clusterHC.getTable("hdr_sdb1", "global_store_sales"),
+            cluster2, clusterHC2.getTable("hdr_sdb1", "global_store_sales"), new SoftAssert()
+        ).assertAll();
+    }
+
+    /**
+     * 1 src tbl 1 dst tbl replication. Insert/delete/replace partitions using dynamic partition
+     * queries. The changes should get reflected at destination.
+     */
+    @Test
+    public void drInsertDropReplaceDynamicPartition() throws Exception {
+        final String tblName = "dynamicPartitionDR";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        //disable strict mode to use only dynamic partition
+        runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
+
+        runSql(connection,
+            "create table " + tblName + "(comment string) partitioned by (pname string)");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('this partition is going to be deleted - should NOT appear after dr', 'DELETE')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('this partition is going to be replaced - should NOT appear after dr', 'REPLACE')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('this partition will have more data - should appear after dr', 'ADD_DATA')");
+
+        LOGGER.info(tblName + " before bootstrap copying: ");
+        runSql(connection, "select * from " + tblName);
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('this partition has been added post bootstrap - should appear after dr', 'NEW_PART')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('more data has been added post bootstrap - should appear after dr', 'ADD_DATA')");
+        runSql(connection,
+            "alter table " + tblName + " drop partition(pname = 'DELETE')");
+        runSql(connection,
+            "alter table " + tblName + " drop partition(pname = 'REPLACE')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('this partition has been replaced - should appear after dr', 'REPLACE')");
+
+        LOGGER.info(tblName + " after modifications, before replication: ");
+        runSql(connection, "select * from " + tblName);
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+        ).assertAll();
+    }
+
+    /**
+     * 1 src tbl 1 dst tbl replication. Insert/overwrite partitions using dynamic partitions
+     * queries. The changes should get reflected at destination.
+     * @throws Exception
+     */
+    @Test
+    public void drInsertOverwriteDynamicPartition() throws Exception {
+        final String tblName = "drInsertOverwritePartition";
+        final String hlpTblName = "drInsertOverwritePartitionHelperTbl";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName);
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        //disable strict mode to use only dynamic partition
+        runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
+
+        runSql(connection,
+            "create table " + hlpTblName + "(comment string) partitioned by (pname string)");
+        runSql(connection,
+            "insert into table " + hlpTblName + " partition (pname)"
+                + " values('overwrite data - should appear after dr', 'OVERWRITE_PART')");
+        runSql(connection,
+            "insert into table " + hlpTblName + " partition (pname)"
+            + " values('newdata row2 - should appear after dr', 'NEW_DATA')");
+        runSql(connection,
+            "insert into table " + hlpTblName + " partition (pname)"
+                + " values('newdata row1 - should appear after dr', 'NEW_DATA')");
+
+        runSql(connection,
+            "create table " + tblName + "(comment string) partitioned by (pname string)");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('this data should be retained - should appear after dr', 'OLD_PART')");
+        runSql(connection,
+            "insert into table " + tblName + " partition (pname) values"
+                + "('this data should get overwritten - should NOT appear after dr', 'OVERWRITE_PART')");
+
+        LOGGER.info(tblName + " before bootstrap copying: ");
+        runSql(connection, "select * from " + tblName);
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        runSql(connection,
+            "insert overwrite table " + tblName + " partition (pname) "
+                + "select comment, pname from " + hlpTblName + " where comment REGEXP '^overwrite'");
+        runSql(connection,
+            "insert overwrite table " + tblName + " partition (pname) "
+                + "select comment, pname from " + hlpTblName + " where comment REGEXP '^newdata'");
+
+        LOGGER.info(tblName + " after modifications, before replication: ");
+        runSql(connection, "select * from " + tblName);
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName),
+            cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true)
+        ).assertAll();
+    }
+
+    /**
+     * Run recipe with different frequencies. Submission should go through.
+     * Check frequency of the launched oozie job
+     */
+    @Test(dataProvider = "frequencyGenerator")
+    public void differentRecipeFrequenciesTest(String frequency) throws Exception {
+        LOGGER.info("Testing with frequency: " + frequency);
+        String tblName = "myTable";
+        recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName)
+            .withFrequency(new Frequency(frequency));
+        runSql(connection, "create table " + tblName + "(comment string)");
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+        LOGGER.info("Submission went through.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        String filter = "name=FALCON_PROCESS_" + recipeMerlin.getName();
+        List<BundleJob> bundleJobs = OozieUtil.getBundles(clusterOC, filter, 0, 10);
+        List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs);
+        String bundleId = OozieUtil.getMaxId(bundleIds);
+        List<CoordinatorJob> coords = clusterOC.getBundleJobInfo(bundleId).getCoordinators();
+        List<String> cIds = new ArrayList<String>();
+        for (CoordinatorJob coord : coords) {
+            cIds.add(coord.getId());
+        }
+        String coordId = OozieUtil.getMinId(cIds);
+        CoordinatorJob job = clusterOC.getCoordJobInfo(coordId);
+        CoordinatorJob.Timeunit timeUnit = job.getTimeUnit();
+        String freq = job.getFrequency();
+        LOGGER.info("Frequency of running job: " + timeUnit + " " + freq);
+        Assert.assertTrue(frequency.contains(timeUnit.name().toLowerCase().replace("_", ""))
+            && frequency.contains(freq), "Running job has different frequency.");
+    }
+
+    @DataProvider(name = "frequencyGenerator")
+    public Object[][] frequencyGenerator() {
+        return new Object[][]{{"minutes(10)"}, {"minutes(10000)"},
+            {"days(3)"}, {"days(3000)"}, {"months(1)"}, {"months(1000)"}, };
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws IOException {
+        try {
+            prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null);
+        } catch (Exception e) {
+            LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e);
+        }
+        removeTestClassEntities();
+        cleanTestsDirs();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
new file mode 100644
index 0000000..a64bd6d
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.cli.FalconCLI;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.RecipeMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.supportClasses.NotifyingAssert;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.Config;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.HiveAssert;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy;
+import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable;
+
+/**
+ * Hive DR Testing for Hive database replication.
+ */
+@Test(groups = "embedded")
+public class HiveDbDRTest extends BaseTestClass {
+    private static final Logger LOGGER = Logger.getLogger(HiveDbDRTest.class);
+    private final ColoHelper cluster = servers.get(0);
+    private final ColoHelper cluster2 = servers.get(1);
+    private final FileSystem clusterFS = serverFS.get(0);
+    private final FileSystem clusterFS2 = serverFS.get(1);
+    private final OozieClient clusterOC = serverOC.get(0);
+    private HCatClient clusterHC;
+    private HCatClient clusterHC2;
+    private RecipeMerlin recipeMerlin;
+    private Connection connection;
+    private Connection connection2;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        clusterHC = cluster.getClusterHelper().getHCatClient();
+        clusterHC2 = cluster2.getClusterHelper().getHCatClient();
+        bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster);
+        bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2);
+        bundles[0].generateUniqueBundle(this);
+        bundles[1].generateUniqueBundle(this);
+        final ClusterMerlin srcCluster = bundles[0].getClusterElement();
+        final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
+        Bundle.submitCluster(bundles[0]);
+
+        if (MerlinConstants.IS_SECURE) {
+            recipeMerlin = RecipeMerlin.readFromDir("HiveDrSecureRecipe",
+                FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+                .withRecipeCluster(srcCluster);
+        } else {
+            recipeMerlin = RecipeMerlin.readFromDir("HiveDrRecipe",
+                FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY)
+                .withRecipeCluster(srcCluster);
+        }
+        recipeMerlin.withSourceCluster(srcCluster)
+            .withTargetCluster(tgtCluster)
+            .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
+            .withValidity(TimeUtil.getTimeWrtSystemTime(-1), TimeUtil.getTimeWrtSystemTime(11));
+        recipeMerlin.setUniqueName(this.getClass().getSimpleName());
+
+        connection = cluster.getClusterHelper().getHiveJdbcConnection();
+
+        connection2 = cluster2.getClusterHelper().getHiveJdbcConnection();
+    }
+
+    private void setUpDb(String dbName, Connection conn) throws SQLException {
+        runSql(conn, "drop database if exists " + dbName + " cascade");
+        runSql(conn, "create database " + dbName);
+        runSql(conn, "use " + dbName);
+    }
+
+    @Test
+    public void drDbDropDb() throws Exception {
+        final String dbName = "drDbDropDb";
+        setUpDb(dbName, connection);
+        setUpDb(dbName, connection2);
+        recipeMerlin.withSourceDb(dbName).withSourceTable("*");
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        runSql(connection, "drop database " + dbName);
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        final List<String> dstDbs = runSql(connection2, "show databases");
+        Assert.assertFalse(dstDbs.contains(dbName), "dstDbs = " + dstDbs + " was not expected to "
+            + "contain " + dbName);
+    }
+
+
+    @Test(dataProvider = "isDBReplication")
+    public void drDbFailPass(Boolean isDBReplication) throws Exception {
+        final String dbName = "drDbFailPass";
+        final String tblName = "vanillaTable";
+        final String hiveWarehouseLocation = Config.getProperty("hive.warehouse.location", "/apps/hive/warehouse/");
+        final String dbPath = HadoopUtil.joinPath(hiveWarehouseLocation, dbName.toLowerCase() + ".db");
+        setUpDb(dbName, connection);
+        runSql(connection, "create table " + tblName + "(data string)");
+        setUpDb(dbName, connection2);
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+
+        recipeMerlin.withSourceDb(dbName).withSourceTable(isDBReplication ? "*" : tblName);
+
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        runSql(connection, "insert into table " + tblName + " values('cannot be replicated now')");
+        final String noReadWritePerm = "d---r-xr-x";
+        LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + noReadWritePerm);
+        clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(noReadWritePerm));
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.KILLED, EntityType.PROCESS);
+
+        final String readWritePerm = "drwxr-xr-x";
+        LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + readWritePerm);
+        clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(readWritePerm));
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName),
+            cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true)
+        ).assertAll();
+    }
+
+    @Test
+    public void drDbAddDropTable() throws Exception {
+        final String dbName = "drDbAddDropTable";
+        final String tblToBeDropped = "table_to_be_dropped";
+        final String tblToBeDroppedAndAdded = "table_to_be_dropped_and_readded";
+        final String newTableToBeAdded = "new_table_to_be_added";
+
+        setUpDb(dbName, connection);
+        setUpDb(dbName, connection2);
+        recipeMerlin.withSourceDb(dbName).withSourceTable("*")
+            .withFrequency(new Frequency("2", Frequency.TimeUnit.minutes));
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        createVanillaTable(connection, tblToBeDropped);
+        createVanillaTable(connection, tblToBeDroppedAndAdded);
+        bootstrapCopy(connection, clusterFS, tblToBeDropped,
+            connection2, clusterFS2, tblToBeDropped);
+        bootstrapCopy(connection, clusterFS, tblToBeDroppedAndAdded,
+            connection2, clusterFS2, tblToBeDroppedAndAdded);
+
+        /* For first replication - two tables are dropped & one table is added */
+        runSql(connection, "drop table " + tblToBeDropped);
+        runSql(connection, "drop table " + tblToBeDroppedAndAdded);
+        createVanillaTable(connection, newTableToBeAdded);
+
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        final NotifyingAssert anAssert = new NotifyingAssert(true);
+        HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName),
+            cluster2, clusterHC2.getDatabase(dbName), anAssert);
+
+        /* For second replication - a dropped tables is added back */
+        createVanillaTable(connection, tblToBeDroppedAndAdded);
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName),
+            cluster2, clusterHC2.getDatabase(dbName), anAssert);
+        anAssert.assertAll();
+    }
+
+    @Test(enabled = false)
+    public void drDbNonReplicatableTable() throws Exception {
+        final String dbName = "drDbNonReplicatableTable";
+        final String tblName = "vanillaTable";
+        final String tblView = "vanillaTableView";
+        final String tblOffline = "offlineTable";
+
+        setUpDb(dbName, connection);
+        setUpDb(dbName, connection2);
+        recipeMerlin.withSourceDb(dbName).withSourceTable("*")
+            .withFrequency(new Frequency("2", Frequency.TimeUnit.minutes));
+        final List<String> command = recipeMerlin.getSubmissionCommand();
+
+        createVanillaTable(connection, tblName);
+        runSql(connection, "create view " + tblView + " as select * from " + tblName);
+        createVanillaTable(connection, tblOffline);
+        bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName);
+        bootstrapCopy(connection, clusterFS, tblOffline, connection2, clusterFS2, tblOffline);
+        final String newComment = "'new comment for offline table should not reach destination'";
+        runSql(connection,
+            "alter table " + tblOffline + " set tblproperties ('comment' =" + newComment +")");
+        runSql(connection, "alter table " + tblOffline + " enable offline");
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        //vanilla table gets replicated, offline table & view are not replicated
+        HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName),
+            cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true)).assertAll();
+        final List<String> dstTables = runSql(connection2, "show tables");
+        Assert.assertFalse(dstTables.contains(tblView),
+            "dstTables = " + dstTables + " was not expected to contain " + tblView);
+        final List<String> dstComment =
+            runSql(connection2, "show tblproperties " + tblOffline + "('comment')");
+        Assert.assertFalse(dstComment.contains(newComment),
+            tblOffline + " comment = " + dstComment + " was not expected to contain " + newComment);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws IOException {
+        try {
+            prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null);
+        } catch (Exception e) {
+            LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e);
+        }
+        removeTestClassEntities();
+        cleanTestsDirs();
+    }
+
+    @DataProvider
+    public Object[][] isDBReplication() {
+        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java
new file mode 100644
index 0000000..9eb389a
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.apache.falcon.regression.core.util.HadoopUtil.writeDataForHive;
+import static org.apache.falcon.regression.core.util.HiveUtil.runSql;
+
+/**
+ * Create Hive tables for testing Hive DR. Note that this is not expected to be used out of
+ * HiveDR tests.
+ */
+final class HiveObjectCreator {
+    private static final Logger LOGGER = Logger.getLogger(HiveObjectCreator.class);
+    private static final String HDFS_TMP_DIR = "/tmp/hive_objects/";
+
+    private HiveObjectCreator() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    static void bootstrapCopy(Connection srcConnection, FileSystem srcFs, String srcTable,
+                              Connection dstConnection, FileSystem dstFs, String dstTable) throws Exception {
+        LOGGER.info("Starting bootstrap...");
+        final String dumpPath = HDFS_TMP_DIR + srcTable + "/";
+        HadoopUtil.recreateDir(srcFs, dumpPath);
+        runSqlQuietly(srcConnection, "dfs -chmod -R 777 " + dumpPath);
+        HadoopUtil.deleteDirIfExists(dumpPath, dstFs);
+        runSql(srcConnection, "export table " + srcTable + " to '" + dumpPath + "' FOR REPLICATION('ignore')");
+        FileUtil.copy(srcFs, new Path(dumpPath), dstFs, new Path(dumpPath), false, true, new Configuration());
+        runSqlQuietly(dstConnection, "dfs -chmod -R 777 " + dumpPath);
+        runSql(dstConnection, "import table " + dstTable + " from '" + dumpPath + "'");
+        HadoopUtil.deleteDirIfExists(dumpPath, srcFs);
+        HadoopUtil.deleteDirIfExists(dumpPath, dstFs);
+        LOGGER.info("Finished bootstrap");
+    }
+
+    /* We need to delete it using hive query as the created directory is owned by hive.*/
+    private static void runSqlQuietly(Connection srcConnection, String sql) {
+        try {
+            runSql(srcConnection, sql);
+        } catch (SQLException ex) {
+            LOGGER.info("Exception while hive ql execution: " + ex.getMessage());
+        }
+    }
+
+    /**
+     * Create an external table.
+     * @param connection jdbc connection object to use for issuing queries to hive
+     * @param fs filesystem object to upload the data
+     * @param clickDataLocation location to upload the data to
+     * @throws IOException
+     * @throws SQLException
+     */
+    static void createExternalTable(Connection connection, FileSystem fs, String
+        clickDataLocation, String tableName) throws IOException, SQLException {
+        HadoopUtil.deleteDirIfExists(clickDataLocation, fs);
+        fs.mkdirs(new Path(clickDataLocation));
+        fs.setPermission(new Path(clickDataLocation), FsPermission.getDirDefault());
+        writeDataForHive(fs, clickDataLocation,
+            new StringBuffer("click1").append((char) 0x01).append("01:01:01").append("\n")
+                .append("click2").append((char) 0x01).append("02:02:02"), true);
+        //clusterFS.setPermission(new Path(clickDataPart2), FsPermission.getFileDefault());
+        runSql(connection, "create external table " + tableName
+            + " (data string, time string) "
+            + "location '" + clickDataLocation + "'");
+        runSql(connection, "select * from " + tableName);
+    }
+
+
+    /**
+     * Create an external table.
+     * @param connection jdbc connection object to use for issuing queries to hive
+     * @param fs filesystem object to upload the data
+     * @param clickDataLocation location to upload the data to
+     * @throws IOException
+     * @throws SQLException
+     */
+    static void createExternalPartitionedTable(Connection connection, FileSystem fs, String
+        clickDataLocation, String tableName) throws IOException, SQLException {
+        final String clickDataPart1 = clickDataLocation + "2001-01-01/";
+        final String clickDataPart2 = clickDataLocation + "2001-01-02/";
+        fs.mkdirs(new Path(clickDataLocation));
+        fs.setPermission(new Path(clickDataLocation), FsPermission.getDirDefault());
+        writeDataForHive(fs, clickDataPart1,
+            new StringBuffer("click1").append((char) 0x01).append("01:01:01"), true);
+        writeDataForHive(fs, clickDataPart2,
+            new StringBuffer("click2").append((char) 0x01).append("02:02:02"), true);
+        //clusterFS.setPermission(new Path(clickDataPart2), FsPermission.getFileDefault());
+        runSql(connection, "create external table " + tableName
+            + " (data string, time string) partitioned by (date_ string) "
+            + "location '" + clickDataLocation + "'");
+        runSql(connection, "alter table " + tableName + " add partition "
+            + "(date_='2001-01-01') location '" + clickDataPart1 + "'");
+        runSql(connection, "alter table " + tableName + " add partition "
+            + "(date_='2001-01-02') location '" + clickDataPart2 + "'");
+        runSql(connection, "select * from " + tableName);
+    }
+
+    /**
+     * Create an partitioned table.
+     * @param connection jdbc connection object to use for issuing queries to hive
+     * @throws SQLException
+     */
+    static void createPartitionedTable(Connection connection) throws SQLException {
+        runSql(connection, "create table global_store_sales "
+            + "(customer_id string, item_id string, quantity float, price float, time timestamp) "
+            + "partitioned by (country string)");
+        runSql(connection,
+            "insert into table global_store_sales partition (country = 'us') values"
+                + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01')");
+        runSql(connection,
+            "insert into table global_store_sales partition (country = 'uk') values"
+                + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')");
+        runSql(connection, "select * from global_store_sales");
+    }
+
+    /**
+     * Create an plain old table.
+     * @param connection jdbc connection object to use for issuing queries to hive
+     * @param tblName
+     * @throws SQLException
+     */
+    static void createVanillaTable(Connection connection, String tblName) throws SQLException {
+        //vanilla table
+        runSql(connection, "create table " + tblName
+            + "(customer_id string, item_id string, quantity float, price float, time timestamp)");
+        runSql(connection, "insert into table " + tblName + " values "
+            + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01'), "
+            + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')");
+        runSql(connection, "select * from " + tblName);
+    }
+
+    /**
+     * Create a partitioned table with either dynamic or static partitions.
+     * @param connection jdbc connection object to use for issuing queries to hive
+     * @param dynamic should partitions be added in dynamic or static way
+     * @throws SQLException
+     */
+    static void createPartitionedTable(Connection connection,
+                                       boolean dynamic) throws SQLException {
+        String [][] partitions = {
+            {"us", "Kansas", },
+            {"us", "California", },
+            {"au", "Queensland", },
+            {"au", "Victoria", },
+        };
+        //create table
+        runSql(connection, "drop table global_store_sales");
+        runSql(connection, "create table global_store_sales(customer_id string,"
+            + " item_id string, quantity float, price float, time timestamp) "
+            + "partitioned by (country string, state string)");
+        //provide data
+        String query;
+        if (dynamic) {
+            //disable strict mode, thus both partitions can be used as dynamic
+            runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict");
+            query = "insert into table global_store_sales partition"
+                + "(country, state) values('c%3$s', 'i%3$s', '%3$s', '%3$s', "
+                + "'2001-01-01 01:01:0%3$s', '%1$s', '%2$s')";
+        } else {
+            query = "insert into table global_store_sales partition"
+                + "(country = '%1$s', state = '%2$s') values('c%3$s', 'i%3$s', '%3$s', '%3$s', "
+                + "'2001-01-01 01:01:0%3$s')";
+        }
+        for (int i = 0; i < partitions.length; i++) {
+            runSql(connection, String.format(query, partitions[i][0], partitions[i][1], i + 1));
+        }
+        runSql(connection, "select * from global_store_sales");
+    }
+
+    static void createSerDeTable(Connection connection) throws SQLException {
+        runSql(connection, "create table store_json "
+            + "(customer_id string, item_id string, quantity float, price float, time timestamp) "
+            + "row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' ");
+        runSql(connection, "insert into table store_json values "
+            + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01'), "
+            + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')");
+        runSql(connection, "select * from store_json");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
index 8ef6bb6..7ad4c8e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
@@ -88,7 +88,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
     }
 
     /*
-     * Prepares running feed with instances ordered (desc): 1 waiting, 1 suspended, 1 running,
+     * Prepares running feed with instances ordered (desc): 1 waiting, 1 running, 1 suspended,
      * 3 waiting and 6 killed. Testing is based on expected instances statuses.
      */
     private void prepareScenario() throws AuthenticationException, IOException, URISyntaxException,
@@ -284,9 +284,9 @@ public class ListFeedInstancesTest extends BaseTestClass {
             "start=" + TimeUtil.addMinsToTime(endTime, -5) + "&end=" + endTime, null);
         InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
 
-        //only start, actual feed startTime, should get 10 most recent instances(by default).
+        //only start, actual feed startTime, should get 1-10 instances(end is automatically set to freq*10).
         r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime, null);
-        InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
+        InstanceUtil.validateResponse(r, 10, 0, 1, 3, 6);
 
         //only start, greater then the actual startTime.
         r = prism.getFeedHelper().listInstances(feedName,

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
index f35e12d..be8a631 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.regression.lineage;
 
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.BundleUtil;
@@ -75,15 +74,12 @@ public class ListProcessInstancesTest extends BaseTestClass {
         bundles[0].setInputFeedDataPath(feedDataLocation);
         bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output" + MINUTE_DATE_PATTERN);
         bundles[0].setProcessValidity(startTime, endTime);
-        bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitAndScheduleProcess();
         processName = bundles[0].getProcessName();
         InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         //create data for processes to run and wait some time for instances to make progress
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3);
     }
@@ -94,8 +90,7 @@ public class ListProcessInstancesTest extends BaseTestClass {
     }
 
     /**
-     * List process instances using orderBy - status, -startTime, -endTime params, expecting list of
-     * process instances in the right order.
+     * List process instances using orderBy - status, -startTime, -endTime params.
      */
     @Test
     public void testProcessOrderBy() throws Exception {
@@ -172,12 +167,12 @@ public class ListProcessInstancesTest extends BaseTestClass {
 
         //use start option without numResults. 10 instances expected
         r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null);
-        InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0);
+        InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0);
 
         //use start option with numResults value which is smaller then default.
         r = prism.getProcessHelper().listInstances(processName,
             "start=" + startTime + "&numResults=8", null);
-        InstanceUtil.validateResponse(r, 8, 3, 0, 5, 0);
+        InstanceUtil.validateResponse(r, 8, 0, 0, 8, 0);
 
         //use start option with numResults value greater then default. All 12 instances expected
         r = prism.getProcessHelper().listInstances(processName,
@@ -242,8 +237,6 @@ public class ListProcessInstancesTest extends BaseTestClass {
         InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
 
         //wait till new instances be RUNNING and total status count be stable
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 3);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 4);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3);
 
@@ -300,7 +293,7 @@ public class ListProcessInstancesTest extends BaseTestClass {
                 + "&end=" + TimeUtil.addMinsToTime(startTime, 16), null);
         InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
 
-        //only start, actual startTime, should get 10 most recent instances
+        //only start, actual startTime (end is automatically set to start + frequency * 10)
         r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null);
         InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index 4946b30..10ab192 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -87,7 +87,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
     @BeforeMethod(alwaysRun = true)
     public void testSetup() throws Exception {
-        Bundle b = BundleUtil.readUpdateBundle();
+        final Bundle b = BundleUtil.readUpdateBundle();
         bundles[0] = new Bundle(b, cluster1);
         bundles[0].generateUniqueBundle(this);
         bundles[1] = new Bundle(b, cluster2);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
index b7e6861..e8b7ed8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
@@ -68,7 +68,7 @@ public class PrismFeedDeleteTest extends BaseTestClass {
     @BeforeMethod(alwaysRun = true)
     public void setUp() throws Exception {
         restartRequired = false;
-        Bundle bundle = BundleUtil.readELBundle();
+        final Bundle bundle = BundleUtil.readELBundle();
         bundles[0] = new Bundle(bundle, cluster1);
         bundles[0].generateUniqueBundle(this);
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
@@ -361,8 +361,10 @@ public class PrismFeedDeleteTest extends BaseTestClass {
     @Test(groups = {"multiCluster"})
     public void testServer1FeedDeleteNonExistentWhen1ColoIsDownDuringDelete() throws Exception {
         restartRequired = true;
-        bundles[0] = new Bundle(bundles[0], cluster1);
-        bundles[1] = new Bundle(bundles[1], cluster2);
+        bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1);
+        bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
+        bundles[0].generateUniqueBundle(this);
+        bundles[1].generateUniqueBundle(this);
 
         bundles[0].setCLusterColo(cluster1Colo);
         LOGGER.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
index f90a76b..23878df 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
@@ -59,8 +59,7 @@ public class PrismSubmitTest extends BaseTestClass {
     @BeforeMethod(alwaysRun = true)
     public void setUp() throws Exception {
         restartRequired = false;
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0] = new Bundle(bundles[0], cluster1);
+        bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1);
         bundles[0].generateUniqueBundle(this);
         bundles[0].setProcessWorkflow(aggregateWorkflowDir);
     }
@@ -308,7 +307,7 @@ public class PrismSubmitTest extends BaseTestClass {
     @Test(groups = {"prism", "0.2", "distributed"})
     public void submitClusterReSubmitAlreadyPartial() throws Exception {
         restartRequired = true;
-        bundles[1] = new Bundle(bundles[0], cluster2);
+        bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
         bundles[1].generateUniqueBundle(this);
         bundles[1].setProcessWorkflow(aggregateWorkflowDir);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index e10f8d1..afa01c1 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -83,7 +83,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws IOException {
-        Bundle bundle = BundleUtil.readFeedReplicationBundle();
+        final Bundle bundle = BundleUtil.readFeedReplicationBundle();
         bundles[0] = new Bundle(bundle, cluster1);
         bundles[1] = new Bundle(bundle, cluster2);
         bundles[2] = new Bundle(bundle, cluster3);
@@ -92,8 +92,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         bundles[1].generateUniqueBundle(this);
         bundles[2].generateUniqueBundle(this);
 
-        processBundle = BundleUtil.readELBundle();
-        processBundle = new Bundle(processBundle, cluster1);
+        processBundle = new Bundle(BundleUtil.readELBundle(), cluster1);
         processBundle.generateUniqueBundle(this);
         processBundle.setProcessWorkflow(aggregateWorkflowDir);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
index 63da183..c53e06b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java
@@ -158,6 +158,9 @@ public class EntitiesTableTest extends BaseUITestClass {
 
     @DataProvider
     public Object[][] getBoolean() {
-        return new Boolean[][]{{Boolean.TRUE}, {Boolean.FALSE}};
+        return new Boolean[][]{
+            {Boolean.TRUE},
+            {Boolean.FALSE},
+        };
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
index 4c1e379..4ad775e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
@@ -588,8 +588,8 @@ public class EntityPageTest extends BaseUITestClass {
             .getProcessInstanceLogs(process.getName(),
                 "start=" + nominalTimeOfSelectedInstance
                 + "&end=" + TimeUtil.addMinsToTime(nominalTimeOfSelectedInstance, 1));
-        Assert.assertEquals(getDriver().getCurrentUrl().replaceFirst("/\\?", "?"),
-            processInstanceLogs.getInstances()[0].getLogFile(),
+        Assert.assertEquals(getDriver().getCurrentUrl().replaceFirst("/\\?", "?").toLowerCase(),
+            processInstanceLogs.getInstances()[0].getLogFile().toLowerCase(),
             "Only one instance is selected. "
                 + "Clicking instance log button should take user to oozie page.");
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
index d8aed28..47b1d19 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java
@@ -139,6 +139,7 @@ public class FeedSetupTest extends BaseUITestClass{
     @Test
     public void testWizardCancel() throws Exception {
         // Step 1 - Check cancel on the first page - General Info Page
+        feedWizardPage.setFeedGeneralInfo(feed);
         feedWizardPage.clickCancel();
         searchPage.checkPage();
 
@@ -146,6 +147,7 @@ public class FeedSetupTest extends BaseUITestClass{
         feedWizardPage = searchPage.getPageHeader().doCreateFeed();
         feedWizardPage.setFeedGeneralInfo(feed);
         feedWizardPage.clickNext();
+        feedWizardPage.setFeedPropertiesInfo(feed);
         feedWizardPage.clickCancel();
         searchPage.checkPage();
 
@@ -155,6 +157,7 @@ public class FeedSetupTest extends BaseUITestClass{
         feedWizardPage.clickNext();
         feedWizardPage.setFeedPropertiesInfo(feed);
         feedWizardPage.clickNext();
+        feedWizardPage.setFeedLocationInfo(feed);
         feedWizardPage.clickCancel();
         searchPage.checkPage();
 
@@ -166,6 +169,7 @@ public class FeedSetupTest extends BaseUITestClass{
         feedWizardPage.clickNext();
         feedWizardPage.setFeedLocationInfo(feed);
         feedWizardPage.clickNext();
+        feedWizardPage.setFeedClustersInfo(feed);
         feedWizardPage.clickCancel();
         searchPage.checkPage();
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
index f71739d..20864f6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java
@@ -111,18 +111,26 @@ public class HomePageTest extends BaseUITestClass {
 
         final String clusterXml = bundle.getClusterElement().toString();
         homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(clusterXml));
+        String alert = homePage.getActiveAlertText();
+        Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
         AssertUtil.assertSucceeded(prism.getClusterHelper().getEntityDefinition(clusterXml));
 
         final String feedXml = bundle.getInputFeedFromBundle();
         homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(feedXml));
+        alert = homePage.getActiveAlertText();
+        Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
         AssertUtil.assertSucceeded(prism.getFeedHelper().getEntityDefinition(feedXml));
 
         final String outputFeedXml = bundle.getOutputFeedFromBundle();
         homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(outputFeedXml));
+        alert = homePage.getActiveAlertText();
+        Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
         AssertUtil.assertSucceeded(prism.getFeedHelper().getEntityDefinition(outputFeedXml));
 
         final String processXml = bundle.getProcessObject().toString();
         homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(processXml));
+        alert = homePage.getActiveAlertText();
+        Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'");
         AssertUtil.assertSucceeded(prism.getProcessHelper().getEntityDefinition(processXml));
 
     }
@@ -161,7 +169,6 @@ public class HomePageTest extends BaseUITestClass {
         writer.close();
 
         homePage.getPageHeader().uploadXml(xmlFile.getAbsolutePath());
-        Thread.sleep(1000);
         alertText = homePage.getActiveAlertText();
         Assert.assertEquals(alertText, "Invalid xml. File not uploaded",
             "XML file with invalid text was allowed to be uploaded");

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
index 9ec936d..8598b18 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java
@@ -22,7 +22,11 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.testHelper.BaseUITestClass;
 import org.apache.falcon.regression.ui.search.EntityPage;
 import org.apache.falcon.regression.ui.search.InstancePage;
@@ -33,9 +37,7 @@ import org.apache.falcon.resource.InstancesResult;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -59,14 +61,10 @@ public class InstancePageTest extends BaseUITestClass {
     private String instance = "2010-01-02T01:00Z";
     private String processName;
 
-    @BeforeClass(alwaysRun = true)
-    public void setup() {
-        openBrowser();
-        searchPage = LoginPage.open(getDriver()).doDefaultLogin();
-    }
-
     @BeforeMethod(alwaysRun = true)
     public void submitEntities() throws Exception {
+        openBrowser();
+        searchPage = LoginPage.open(getDriver()).doDefaultLogin();
         cleanAndGetTestDir();
         HadoopUtil.uploadDir(serverFS.get(0), aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
         bundles[0] = BundleUtil.readELBundle();
@@ -184,11 +182,7 @@ public class InstancePageTest extends BaseUITestClass {
 
     @AfterMethod(alwaysRun = true)
     public void tearDown() throws IOException {
-        removeTestClassEntities();
-    }
-
-    @AfterClass(alwaysRun = true)
-    public void tearDownClass() {
         closeBrowser();
+        removeTestClassEntities();
     }
 }