You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:22 UTC

[35/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index c26a075..5173d8b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -28,20 +28,23 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.util.Shell;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,17 +64,22 @@ import static org.junit.Assert.assertNull;
 
 public class TestReplicationScenarios {
 
-  final static String DBNOTIF_LISTENER_CLASSNAME = "org.apache.hive.hcatalog.listener.DbNotificationListener";
+  @Rule
+  public final TestName testName = new TestName();
+
+  private final static String DBNOTIF_LISTENER_CLASSNAME =
+      "org.apache.hive.hcatalog.listener.DbNotificationListener";
       // FIXME : replace with hive copy once that is copied
-  final static String tid =
+  private final static String tid =
       TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis();
-  final static String TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid;
+  private final static String TEST_PATH =
+      System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
 
-  static HiveConf hconf;
-  static boolean useExternalMS = false;
-  static int msPort;
-  static Driver driver;
-  static HiveMetaStoreClient metaStoreClient;
+  private static HiveConf hconf;
+  private static boolean useExternalMS = false;
+  private static int msPort;
+  private static Driver driver;
+  private static HiveMetaStoreClient metaStoreClient;
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
   private ArrayList<String> lastResults;
@@ -93,7 +101,7 @@ public class TestReplicationScenarios {
       return;
     }
 
-    hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
+    hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
         DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
     hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
     hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -140,6 +148,32 @@ public class TestReplicationScenarios {
     ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
   }
 
+  @Test
+  public void testFunctionReplicationAsPartOfBootstrap() throws IOException {
+    String dbName = createDB(testName.getMethodName());
+    run("CREATE FUNCTION " + dbName
+        + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
+        + "using jar  'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+
+    String replicatedDbName = loadAndVerify(dbName);
+    run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'");
+    verifyResults(new String[] { replicatedDbName + ".testFunction" });
+  }
+
+  private String loadAndVerify(String dbName) throws IOException {
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String dumpLocation = getResult(0, 0);
+    String lastReplicationId = getResult(0, 1, true);
+    String replicatedDbName = dbName + "_replicated";
+    run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    printOutput();
+    run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId);
+    return replicatedDbName;
+  }
+
+
   /**
    * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
    * Inserts data into one of the ptned tables, and one of the unptned tables,
@@ -148,12 +182,8 @@ public class TestReplicationScenarios {
    */
   @Test
   public void testBasic() throws IOException {
-
-    String testName = "basic";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -164,9 +194,9 @@ public class TestReplicationScenarios {
     String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -181,31 +211,19 @@ public class TestReplicationScenarios {
     verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
     verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
 
-    advanceDumpDir();
-    run("REPL DUMP " + dbName);
-    String replDumpLocn = getResult(0,0);
-    String replDumpId = getResult(0,1,true);
-    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
-    printOutput();
-    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
-
-    verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId);
+    String replicatedDbName = loadAndVerify(dbName);
 
-    verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
-    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
-    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+    verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2);
     verifyRun("SELECT a from " + dbName + ".ptned_empty", empty);
     verifyRun("SELECT * from " + dbName + ".unptned_empty", empty);
   }
 
   @Test
   public void testBasicWithCM() throws Exception {
-
-    String testName = "basic_with_cm";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -217,10 +235,10 @@ public class TestReplicationScenarios {
     String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
-    String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+    String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -280,12 +298,61 @@ public class TestReplicationScenarios {
   }
 
   @Test
-  public void testIncrementalAdds() throws IOException {
-    String testName = "incrementalAdds";
+  public void testBootstrapLoadOnExistingDb() throws IOException {
+    String testName = "bootstrapLoadOnExistingDb";
     LOG.info("Testing "+testName);
     String dbName = testName + "_" + tid;
 
     run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    createTestDataFile(unptn_locn, unptn_data);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    // Create an empty database to load
+    run("CREATE DATABASE " + dbName + "_empty");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    // Load to an empty database
+    run("REPL LOAD " + dbName + "_empty FROM '" + replDumpLocn + "'");
+
+    // REPL STATUS should return same repl ID as dump
+    verifyRun("REPL STATUS " + dbName + "_empty", replDumpId);
+    verifyRun("SELECT * from " + dbName + "_empty.unptned", unptn_data);
+
+    String[] nullReplId = new String[]{ "NULL" };
+
+    // Create a database with a table
+    run("CREATE DATABASE " + dbName + "_withtable");
+    run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE");
+    // Load using same dump to a DB with table. It should fail as DB is not empty.
+    verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'");
+
+    // REPL STATUS should return NULL
+    verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId);
+
+    // Create a database with a view
+    run("CREATE DATABASE " + dbName + "_withview");
+    run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE");
+    run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned");
+    // Load using same dump to a DB with view. It should fail as DB is not empty.
+    verifyFail("REPL LOAD " + dbName + "_withview FROM '" + replDumpLocn + "'");
+
+    // REPL STATUS should return NULL
+    verifyRun("REPL STATUS " + dbName + "_withview", nullReplId);
+  }
+
+  @Test
+  public void testIncrementalAdds() throws IOException {
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
 
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -304,9 +371,9 @@ public class TestReplicationScenarios {
     String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -368,11 +435,8 @@ public class TestReplicationScenarios {
   @Test
   public void testDrops() throws IOException {
 
-    String testName = "drops";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -383,9 +447,9 @@ public class TestReplicationScenarios {
     String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
     String[] empty = new String[]{};
 
-    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
-    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
-    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
 
     createTestDataFile(unptn_locn, unptn_data);
     createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -482,10 +546,7 @@ public class TestReplicationScenarios {
   public void testDropsWithCM() throws IOException {
 
     String testName = "drops_with_cm";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -608,10 +669,7 @@ public class TestReplicationScenarios {
   public void testAlters() throws IOException {
 
     String testName = "alters";
-    LOG.info("Testing "+testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -793,10 +851,7 @@ public class TestReplicationScenarios {
   @Test
   public void testIncrementalLoad() throws IOException {
     String testName = "incrementalLoad";
-    LOG.info("Testing " + testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
 
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
     run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -844,6 +899,7 @@ public class TestReplicationScenarios {
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
     verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
 
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
     run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
         + ".ptned PARTITION(b=1)");
     verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
@@ -873,15 +929,14 @@ public class TestReplicationScenarios {
 
     verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
     verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
   }
 
   @Test
   public void testIncrementalInserts() throws IOException {
     String testName = "incrementalInserts";
-    LOG.info("Testing " + testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String dbName = createDB(testName);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
 
     advanceDumpDir();
@@ -892,13 +947,14 @@ public class TestReplicationScenarios {
     run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
 
     String[] unptn_data = new String[] { "eleven", "twelve" };
+
     run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
     run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
-    verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
+    verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
 
     run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
     run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
-    verifyRun("SELECT * from " + dbName + ".unptned_late", unptn_data);
+    verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data);
 
     advanceDumpDir();
     run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -909,14 +965,227 @@ public class TestReplicationScenarios {
     run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
-    verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
-    verifyRun("SELECT a from " + dbName + ".unptned_late", unptn_data);
-    verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data);
-    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data);
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data);
+
+    String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" };
+    String[] data_after_ovwrite = new String[] { "hundred" };
+    run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins);
+    run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+  }
+
+  @Test
+  public void testIncrementalInsertToPartition() throws IOException {
+    String testName = "incrementalInsertToPartition";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
+
+    String[] data_after_ovwrite = new String[] { "hundred" };
+    // Insert overwrite on existing partition
+    run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
+    // Insert overwrite on dynamic partition
+    run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite);
+  }
+
+  @Test
+  public void testViewsReplication() throws IOException {
+    String testName = "viewsReplication";
+    String dbName = createDB(testName);
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE VIEW " + dbName + ".virtual_view AS SELECT * FROM " + dbName + ".unptned");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    verifySetup("SELECT a from " + dbName + ".ptned", empty);
+    verifySetup("SELECT * from " + dbName + ".unptned", empty);
+    verifySetup("SELECT * from " + dbName + ".virtual_view", empty);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+    verifySetup("SELECT * from " + dbName + ".virtual_view", unptn_data);
+
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+    run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view AS SELECT a FROM " + dbName + ".ptned where b=1");
+    verifySetup("SELECT a from " + dbName + ".mat_view", ptn_data_1);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    LOG.info("Bootstrap-dump: Dumped to {} with id {}",replDumpLocn,replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+
+    run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+    verifySetup("SELECT a from " + dbName + ".virtual_view2", ptn_data_2);
+
+    // Create a view with name already exist. Just to verify if failure flow clears the added create_table event.
+    run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+
+    run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view2 AS SELECT * FROM " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".mat_view2", unptn_data);
+
+    // Perform REPL-DUMP/LOAD
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId );
+    String incrementalDumpLocn = getResult(0,0);
+    String incrementalDumpId = getResult(0,1,true);
+    LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {incrementalDumpId});
+
+    verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where b=1", ptn_data_1);
+    verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+    verifyRun("SELECT * from " + dbName + "_dupe.virtual_view2", ptn_data_2);
+    verifyRun("SELECT * from " + dbName + "_dupe.mat_view2", unptn_data);
+  }
+
+  @Test
+  public void testDumpLimit() throws IOException {
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+    String[] unptn_data = new String[] { "eleven", "thirteen", "twelve" };
+    String[] unptn_data_load1 = new String[] { "eleven" };
+    String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+    // 3 events to insert, last repl ID: replDumpId+3
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    // 3 events to insert, last repl ID: replDumpId+6
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    // 3 events to insert, last repl ID: replDumpId+9
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[2] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
 
-    String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen" };
-    run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[2] + "')");
-    verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins);
+    advanceDumpDir();
+    Integer lastReplID = Integer.valueOf(replDumpId);
+    lastReplID += 1000;
+    String toReplID = String.valueOf(lastReplID);
+
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
 
     advanceDumpDir();
     run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -924,11 +1193,312 @@ public class TestReplicationScenarios {
     incrementalDumpId = getResult(0, 1, true);
     LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
     replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+  }
+
+  @Test
+  public void testExchangePartition() throws IOException {
+    String testName = "exchangePartition";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".ptned_src(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_dest(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+
+    String[] empty = new String[] {};
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[2] + "')");
+
+    run("ALTER TABLE " + dbName + ".ptned_src ADD PARTITION (b=2, c=2)");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[2] + "')");
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[2] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+    // Exchange single partitions using complete partition-spec (all partition columns)
+    run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=1, c=1) WITH TABLE " + dbName + ".ptned_src");
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3)", empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+    // Exchange multiple partitions using partial partition-spec (only one partition column)
+    run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=2) WITH TABLE " + dbName + ".ptned_src");
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+  }
+
+  @Test
+  public void testTruncateTable() throws IOException {
+    String testName = "truncateTable";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[] { "eleven", "twelve" };
+    String[] empty = new String[] {};
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
     run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
 
-    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins);
+    run("TRUNCATE TABLE " + dbName + ".unptned");
+    verifySetup("SELECT a from " + dbName + ".unptned", empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty);
+
+    String[] unptn_data_after_ins = new String[] { "thirteen" };
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins);
+  }
+
+  @Test
+  public void testTruncatePartitionedTable() throws IOException {
+    String testName = "truncatePartitionedTable";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+    String[] empty = new String[] {};
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')");
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')");
+
+    verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+    run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty);
+
+    run("TRUNCATE TABLE " + dbName + ".ptned_2");
+    verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty);
+  }
+
+  @Test
+  public void testTruncateWithCM() throws IOException {
+    String testName = "truncateWithCM";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+    String[] empty = new String[] {};
+    String[] unptn_data = new String[] { "eleven", "thirteen" };
+    String[] unptn_data_load1 = new String[] { "eleven" };
+    String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+    // 3 events to insert, last repl ID: replDumpId+3
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    // 3 events to insert, last repl ID: replDumpId+6
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    // 1 event to truncate, last repl ID: replDumpId+8
+    run("TRUNCATE TABLE " + dbName + ".unptned");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty);
+    // 3 events to insert, last repl ID: replDumpId+11
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    // Dump and load only first insert (1 record)
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+
+    // Dump and load only second insert (2 records)
+    advanceDumpDir();
+    Integer lastReplID = Integer.valueOf(replDumpId);
+    lastReplID += 1000;
+    String toReplID = String.valueOf(lastReplID);
+
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
+
+    // Dump and load only truncate (0 records)
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 2");
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", empty);
+
+    // Dump and load insert after truncate (1 record)
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
   }
 
   @Test
@@ -953,11 +1523,8 @@ public class TestReplicationScenarios {
 
     // Now, to actually testing status - first, we bootstrap.
 
-    String testName = "incrementalStatus";
-    LOG.info("Testing " + testName);
-    String dbName = testName + "_" + tid;
-
-    run("CREATE DATABASE " + dbName);
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
     advanceDumpDir();
     run("REPL DUMP " + dbName);
     String lastReplDumpLocn = getResult(0, 0);
@@ -1012,6 +1579,13 @@ public class TestReplicationScenarios {
 
   }
 
+  private static String createDB(String name) {
+    LOG.info("Testing " + name);
+    String dbName = name + "_" + tid;
+    run("CREATE DATABASE " + dbName);
+    return dbName;
+  }
+
   @Test
   public void testEventFilters(){
     // Test testing that the filters introduced by EventUtils are working correctly.
@@ -1031,8 +1605,8 @@ public class TestReplicationScenarios {
     // events to those that match the dbname and tblname provided to the filter.
     // If the tblname passed in to the filter is null, then it restricts itself
     // to dbname-matching alone.
-    IMetaStoreClient.NotificationFilter dbTblFilter = EventUtils.getDbTblNotificationFilter(dbname,tblname);
-    IMetaStoreClient.NotificationFilter dbFilter = EventUtils.getDbTblNotificationFilter(dbname,null);
+    IMetaStoreClient.NotificationFilter dbTblFilter = new DatabaseAndTableFilter(dbname,tblname);
+    IMetaStoreClient.NotificationFilter dbFilter = new DatabaseAndTableFilter(dbname,null);
 
     assertFalse(dbTblFilter.accept(null));
     assertTrue(dbTblFilter.accept(createDummyEvent(dbname, tblname, 0)));
@@ -1049,7 +1623,7 @@ public class TestReplicationScenarios {
     // within a range specified.
     long evBegin = 50;
     long evEnd = 75;
-    IMetaStoreClient.NotificationFilter evRangeFilter = EventUtils.getEventBoundaryFilter(evBegin,evEnd);
+    IMetaStoreClient.NotificationFilter evRangeFilter = new EventBoundaryFilter(evBegin,evEnd);
 
     assertTrue(evBegin < evEnd);
     assertFalse(evRangeFilter.accept(null));
@@ -1065,9 +1639,9 @@ public class TestReplicationScenarios {
     // that match a provided message format
 
     IMetaStoreClient.NotificationFilter restrictByDefaultMessageFormat =
-        EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat());
+        new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat());
     IMetaStoreClient.NotificationFilter restrictByArbitraryMessageFormat =
-        EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat() + "_bogus");
+        new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat() + "_bogus");
     NotificationEvent dummyEvent = createDummyEvent(dbname,tblname,0);
 
     assertEquals(MessageFactory.getInstance().getMessageFormat(),dummyEvent.getMessageFormat());
@@ -1092,21 +1666,19 @@ public class TestReplicationScenarios {
       }
     };
 
-    assertTrue(EventUtils.andFilter(yes, yes).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(yes, no).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(no, yes).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(no, no).accept(dummyEvent));
-
-    assertTrue(EventUtils.andFilter(yes, yes, yes).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(yes, yes, no).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(yes, no, yes).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(yes, no, no).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(no, yes, yes).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(no, yes, no).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(no, no, yes).accept(dummyEvent));
-    assertFalse(EventUtils.andFilter(no, no, no).accept(dummyEvent));
-
-
+    assertTrue(new AndFilter(yes, yes).accept(dummyEvent));
+    assertFalse(new AndFilter(yes, no).accept(dummyEvent));
+    assertFalse(new AndFilter(no, yes).accept(dummyEvent));
+    assertFalse(new AndFilter(no, no).accept(dummyEvent));
+
+    assertTrue(new AndFilter(yes, yes, yes).accept(dummyEvent));
+    assertFalse(new AndFilter(yes, yes, no).accept(dummyEvent));
+    assertFalse(new AndFilter(yes, no, yes).accept(dummyEvent));
+    assertFalse(new AndFilter(yes, no, no).accept(dummyEvent));
+    assertFalse(new AndFilter(no, yes, yes).accept(dummyEvent));
+    assertFalse(new AndFilter(no, yes, no).accept(dummyEvent));
+    assertFalse(new AndFilter(no, no, yes).accept(dummyEvent));
+    assertFalse(new AndFilter(no, no, no).accept(dummyEvent));
   }
 
   private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) {
@@ -1137,7 +1709,7 @@ public class TestReplicationScenarios {
     if (tblName != null){
       verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
     }
-    assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+    assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId));
     return lastReplDumpId;
   }
 
@@ -1152,7 +1724,7 @@ public class TestReplicationScenarios {
     run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn + "'");
     verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId);
     verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
-    assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+    assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId));
     return lastReplDumpId;
   }
 
@@ -1174,18 +1746,25 @@ public class TestReplicationScenarios {
     return (lastResults.get(rowNum).split("\\t"))[colNum];
   }
 
+  /**
+   * All the results that are read from the hive output will not preserve
+   * case sensitivity and will all be in lower case, hence we will check against
+   * only lower case data values.
+   * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
+   * before assert.
+   */
   private void verifyResults(String[] data) throws IOException {
     List<String> results = getOutput();
-    LOG.info("Expecting {}",data);
-    LOG.info("Got {}",results);
-    assertEquals(data.length,results.size());
-    for (int i = 0; i < data.length; i++){
-      assertEquals(data[i],results.get(i));
+    LOG.info("Expecting {}", data);
+    LOG.info("Got {}", results);
+    assertEquals(data.length, results.size());
+    for (int i = 0; i < data.length; i++) {
+      assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
     }
   }
 
   private List<String> getOutput() throws IOException {
-    List<String> results = new ArrayList<String>();
+    List<String> results = new ArrayList<>();
     try {
       driver.getResults(results);
     } catch (CommandNeedRetryException e) {
@@ -1217,6 +1796,18 @@ public class TestReplicationScenarios {
     verifyResults(data);
   }
 
+  private void verifyFail(String cmd) throws RuntimeException {
+    boolean success = false;
+    try {
+      success = run(cmd,false);
+    } catch (AssertionError ae){
+      LOG.warn("AssertionError:",ae);
+      throw new RuntimeException(ae);
+    }
+
+    assertFalse(success);
+  }
+
   private static void run(String cmd) throws RuntimeException {
     try {
     run(cmd,false); // default arg-less run simply runs, and does not care about failure
@@ -1246,7 +1837,7 @@ public class TestReplicationScenarios {
     return success;
   }
 
-  public static void createTestDataFile(String filename, String[] lines) throws IOException {
+  private static void createTestDataFile(String filename, String[] lines) throws IOException {
     FileWriter writer = null;
     try {
       File file = new File(filename);
@@ -1261,5 +1852,4 @@ public class TestReplicationScenarios {
       }
     }
   }
-
 }