You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/11 06:56:06 UTC

[2/4] hive git commit: HIVE-16269 : enable incremental function dump to be loaded via repl load (Anishek Agarwal via Sushanth Sowmyan)

http://git-wip-us.apache.org/repos/asf/hive/blob/91c4fa97/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
new file mode 100644
index 0000000..05c1244
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -0,0 +1,1975 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestReplicationScenarios {
+
+  @Rule
+  public final TestName testName = new TestName();
+
+  private final static String DBNOTIF_LISTENER_CLASSNAME =
+      "org.apache.hive.hcatalog.listener.DbNotificationListener";
+      // FIXME : replace with hive copy once that is copied
+  private final static String tid =
+      TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis();
+  private final static String TEST_PATH =
+      System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
+
+  private static HiveConf hconf;
+  private static boolean useExternalMS = false;
+  private static int msPort;
+  private static Driver driver;
+  private static HiveMetaStoreClient metaStoreClient;
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+  private ArrayList<String> lastResults;
+
+  private final boolean VERIFY_SETUP_STEPS = true;
+  // if verifySetup is set to true, all the test setup we do will perform additional
+  // verifications as well, which is useful to verify that our setup occurred
+  // correctly when developing and debugging tests. These verifications, however
+  // do not test any new functionality for replication, and thus, are not relevant
+  // for testing replication itself. For steady state, we want this to be false.
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    hconf = new HiveConf(TestReplicationScenarios.class);
+    String metastoreUri = System.getProperty("test."+HiveConf.ConfVars.METASTOREURIS.varname);
+    if (metastoreUri != null) {
+      hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri);
+      useExternalMS = true;
+      return;
+    }
+
+    hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
+        DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
+    hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+    hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+    hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
+    msPort = MetaStoreUtils.startMetaStore(hconf);
+    hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
+    hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+        + msPort);
+    hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+    hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hconf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+        "false");
+    System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+    System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+    Path testPath = new Path(TEST_PATH);
+    FileSystem fs = FileSystem.get(testPath.toUri(),hconf);
+    fs.mkdirs(testPath);
+
+    driver = new Driver(hconf);
+    SessionState.start(new CliSessionState(hconf));
+    metaStoreClient = new HiveMetaStoreClient(hconf);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass(){
+    // FIXME : should clean up TEST_PATH, but not doing it now, for debugging's sake
+  }
+
+  @Before
+  public void setUp(){
+    // before each test
+  }
+
+  @After
+  public void tearDown(){
+    // after each test
+  }
+
+  private static  int next = 0;
+  private synchronized void advanceDumpDir() {
+    next++;
+    ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+  }
+
+ static class Tuple {
+    final String replicatedDbName;
+    final String lastReplicationId;
+
+    Tuple(String replicatedDbName, String lastReplicationId) {
+      this.replicatedDbName = replicatedDbName;
+      this.lastReplicationId = lastReplicationId;
+    }
+  }
+
+  private Tuple loadAndVerify(String dbName) throws IOException {
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String dumpLocation = getResult(0, 0);
+    String lastReplicationId = getResult(0, 1, true);
+    String replicatedDbName = dbName + "_replicated";
+    run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    printOutput();
+    run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId);
+    return new Tuple(replicatedDbName, lastReplicationId);
+  }
+
+  /**
+   * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
+   * Inserts data into one of the ptned tables, and one of the unptned tables,
+   * and verifies that a REPL DUMP followed by a REPL LOAD is able to load it
+   * appropriately. This tests bootstrap behaviour primarily.
+   */
+  @Test
+  public void testBasic() throws IOException {
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
+
+    String replicatedDbName = loadAndVerify(dbName).replicatedDbName;
+
+    verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + ".ptned_empty", empty);
+    verifyRun("SELECT * from " + dbName + ".unptned_empty", empty);
+  }
+
+  @Test
+  public void testBasicWithCM() throws Exception {
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+    String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+    createTestDataFile(ptn_locn_2_later, ptn_data_2_later);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(ptn_data_2);
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+
+    // Table dropped after "repl dump"
+    run("DROP TABLE " + dbName + ".unptned");
+    // Partition droppped after "repl dump"
+    run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)");
+    // File changed after "repl dump"
+    Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2");
+    Path loc = new Path(p.getSd().getLocation());
+    FileSystem fs = loc.getFileSystem(hconf);
+    Path file = fs.listStatus(loc)[0].getPath();
+    fs.delete(file, false);
+    fs.copyFromLocalFile(new Path(ptn_locn_2_later), file);
+
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {replDumpId});
+
+    run("SELECT * from " + dbName + "_dupe.unptned");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    // Since partition(b=2) changed manually, Hive cannot find
+    // it in original location and cmroot, thus empty
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+  }
+
+  @Test
+  public void testBootstrapLoadOnExistingDb() throws IOException {
+    String testName = "bootstrapLoadOnExistingDb";
+    LOG.info("Testing "+testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    createTestDataFile(unptn_locn, unptn_data);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    // Create an empty database to load
+    run("CREATE DATABASE " + dbName + "_empty");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    // Load to an empty database
+    run("REPL LOAD " + dbName + "_empty FROM '" + replDumpLocn + "'");
+
+    // REPL STATUS should return same repl ID as dump
+    verifyRun("REPL STATUS " + dbName + "_empty", replDumpId);
+    verifyRun("SELECT * from " + dbName + "_empty.unptned", unptn_data);
+
+    String[] nullReplId = new String[]{ "NULL" };
+
+    // Create a database with a table
+    run("CREATE DATABASE " + dbName + "_withtable");
+    run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE");
+    // Load using same dump to a DB with table. It should fail as DB is not empty.
+    verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'");
+
+    // REPL STATUS should return NULL
+    verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId);
+
+    // Create a database with a view
+    run("CREATE DATABASE " + dbName + "_withview");
+    run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE");
+    run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned");
+    // Load using same dump to a DB with view. It should fail as DB is not empty.
+    verifyFail("REPL LOAD " + dbName + "_withview FROM '" + replDumpLocn + "'");
+
+    // REPL STATUS should return NULL
+    verifyRun("REPL STATUS " + dbName + "_withview", nullReplId);
+  }
+
+  @Test
+  public void testIncrementalAdds() throws IOException {
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
+
+    // Now, we load data into the tables, and see if an incremental
+    // repl drop/load can duplicate it.
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+    run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data);
+
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+    run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1",ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2);
+
+    // Perform REPL-DUMP/LOAD
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId );
+    String incrementalDumpLocn = getResult(0,0);
+    String incrementalDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {incrementalDumpId});
+
+    // VERIFY tables and partitions on destination for equivalence.
+
+    verifyRun("SELECT * from " + dbName + "_dupe.unptned_empty", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_empty", empty);
+
+//    verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+    // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
+    // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
+    // fixing that.
+    verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
+
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+  }
+
+  @Test
+  public void testDrops() throws IOException {
+
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned3(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+    verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+    verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=1", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2);
+
+    // At this point, we've set up all the tables and ptns we're going to test drops across
+    // Replicate it first, and then we'll drop it on the source.
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId});
+
+    verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=2", ptn_data_2);
+
+    // All tables good on destination, drop on source.
+
+    run("DROP TABLE " + dbName + ".unptned");
+    run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+    run("DROP TABLE " + dbName + ".ptned2");
+    run("ALTER TABLE " + dbName + ".ptned3 DROP PARTITION (b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned3 WHERE b=1",empty);
+    verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2);
+
+    // replicate the incremental drops
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String postDropReplDumpLocn = getResult(0,0);
+    String postDropReplDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+    // verify that drops were replicated. This can either be from tables or ptns
+    // not existing, and thus, throwing a NoSuchObjectException, or returning nulls
+    // or select * returning empty, depending on what we're testing.
+
+    Exception e = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+      assertNull(tbl);
+    } catch (TException te) {
+      e = te;
+    }
+    assertNotNull(e);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned3", ptn_data_2);
+
+    Exception e2 = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+      assertNull(tbl);
+    } catch (TException te) {
+      e2 = te;
+    }
+    assertNotNull(e2);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+  }
+
+  @Test
+  public void testDropsWithCM() throws IOException {
+
+    String testName = "drops_with_cm";
+    String dbName = createDB(testName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+    run("SELECT a from " + dbName + ".ptned WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+    run("SELECT a from " + dbName + ".ptned WHERE b='2'");
+    verifyResults(ptn_data_2);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+    run("SELECT a from " + dbName + ".ptned2 WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+    run("SELECT a from " + dbName + ".ptned2 WHERE b='2'");
+    verifyResults(ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {replDumpId});
+
+    run("SELECT * from " + dbName + "_dupe.unptned");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'");
+    verifyResults(ptn_data_2);
+    run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'");
+    verifyResults(ptn_data_2);
+
+    run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned");
+    run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned");
+    run("INSERT INTO TABLE " + dbName + ".ptned_copy" + " PARTITION(b='1') SELECT a FROM " +
+        dbName + ".ptned WHERE b='1'");
+    run("SELECT a from " + dbName + ".unptned_copy");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + ".ptned_copy");
+    verifyResults(ptn_data_1);
+
+    run("DROP TABLE " + dbName + ".unptned");
+    run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+    run("DROP TABLE " + dbName + ".ptned2");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + ".ptned");
+    verifyResults(ptn_data_1);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String postDropReplDumpLocn = getResult(0,0);
+    String postDropReplDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+
+    // Drop table after dump
+    run("DROP TABLE " + dbName + ".unptned_copy");
+    // Drop partition after dump
+    run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')");
+
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+    Exception e = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+      assertNull(tbl);
+    } catch (TException te) {
+      e = te;
+    }
+    assertNotNull(e);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + "_dupe.ptned");
+    verifyResults(ptn_data_1);
+
+    Exception e2 = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+      assertNull(tbl);
+    } catch (TException te) {
+      e2 = te;
+    }
+    assertNotNull(e2);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+
+    run("SELECT a from " + dbName + "_dupe.unptned_copy");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned_copy");
+    verifyResults(ptn_data_1);
+  }
+
+  @Test
+  public void testAlters() throws IOException {
+
+    String testName = "alters";
+    String dbName = createDB(testName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned2");
+    verifySetup("SELECT * from " + dbName + ".unptned2", unptn_data);
+
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+    verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'",ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+    verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2);
+
+    // base tables set up, let's replicate them over
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {replDumpId});
+
+    verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+    verifySetup("SELECT * from " + dbName + "_dupe.unptned2", unptn_data);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2);
+
+    // tables have been replicated over, and verified to be identical. Now, we do a couple of
+    // alters on the source
+
+    // Rename unpartitioned table
+    run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_rn");
+    verifySetup("SELECT * from " + dbName + ".unptned_rn", unptn_data);
+
+    // Alter unpartitioned table set table property
+    String testKey = "blah";
+    String testVal = "foo";
+    run("ALTER TABLE " + dbName + ".unptned2 SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')");
+    if (VERIFY_SETUP_STEPS){
+      try {
+        Table unptn2 = metaStoreClient.getTable(dbName,"unptned2");
+        assertTrue(unptn2.getParameters().containsKey(testKey));
+        assertEquals(testVal,unptn2.getParameters().get(testKey));
+      } catch (TException e) {
+        assertNull(e);
+      }
+    }
+
+    // alter partitioned table, rename partition
+    run("ALTER TABLE " + dbName + ".ptned PARTITION (b='2') RENAME TO PARTITION (b='22')");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=22", ptn_data_2);
+
+    // alter partitioned table set table property
+    run("ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')");
+    if (VERIFY_SETUP_STEPS){
+      try {
+        Table ptned = metaStoreClient.getTable(dbName,"ptned");
+        assertTrue(ptned.getParameters().containsKey(testKey));
+        assertEquals(testVal,ptned.getParameters().get(testKey));
+      } catch (TException e) {
+        assertNull(e);
+      }
+    }
+
+    // alter partitioned table's partition set partition property
+    // Note : No DDL way to alter a partition, so we use the MSC api directly.
+    try {
+      List<String> ptnVals1 = new ArrayList<String>();
+      ptnVals1.add("1");
+      Partition ptn1 = metaStoreClient.getPartition(dbName, "ptned", ptnVals1);
+      ptn1.getParameters().put(testKey,testVal);
+      metaStoreClient.alter_partition(dbName,"ptned",ptn1,null);
+    } catch (TException e) {
+      assertNull(e);
+    }
+
+    // rename partitioned table
+    verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2);
+    run("ALTER TABLE " + dbName + ".ptned2 RENAME TO " + dbName + ".ptned2_rn");
+    verifySetup("SELECT a from " + dbName + ".ptned2_rn WHERE b=2", ptn_data_2);
+
+    // All alters done, now we replicate them over.
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String postAlterReplDumpLocn = getResult(0,0);
+    String postAlterReplDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}->{}", postAlterReplDumpLocn, replDumpId, postAlterReplDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'");
+
+    // Replication done, we now do the following verifications:
+
+    // verify that unpartitioned table rename succeeded.
+    Exception e = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "unptned");
+      assertNull(tbl);
+    } catch (TException te) {
+      e = te;
+    }
+    assertNotNull(e);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+    verifyRun("SELECT * from " + dbName + "_dupe.unptned_rn", unptn_data);
+
+    // verify that partition rename succeded.
+    try {
+      Table unptn2 = metaStoreClient.getTable(dbName + "_dupe" , "unptned2");
+      assertTrue(unptn2.getParameters().containsKey(testKey));
+      assertEquals(testVal,unptn2.getParameters().get(testKey));
+    } catch (TException te) {
+      assertNull(te);
+    }
+
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=22", ptn_data_2);
+
+    // verify that ptned table rename succeded.
+    Exception e2 = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "ptned2");
+      assertNull(tbl);
+    } catch (TException te) {
+      e2 = te;
+    }
+    assertNotNull(e2);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2", ptn_data_2);
+
+    // verify that ptned table property set worked
+    try {
+      Table ptned = metaStoreClient.getTable(dbName + "_dupe" , "ptned");
+      assertTrue(ptned.getParameters().containsKey(testKey));
+      assertEquals(testVal, ptned.getParameters().get(testKey));
+    } catch (TException te) {
+      assertNull(te);
+    }
+
+    // verify that partitioned table partition property set worked.
+    try {
+      List<String> ptnVals1 = new ArrayList<String>();
+      ptnVals1.add("1");
+      Partition ptn1 = metaStoreClient.getPartition(dbName + "_dupe", "ptned", ptnVals1);
+      assertTrue(ptn1.getParameters().containsKey(testKey));
+      assertEquals(testVal,ptn1.getParameters().get(testKey));
+    } catch (TException te) {
+      assertNull(te);
+    }
+
+  }
+
+  @Test
+  public void testIncrementalLoad() throws IOException {
+    String testName = "incrementalLoad";
+    String dbName = createDB(testName);
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName
+        + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[] { "eleven", "twelve" };
+    String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "sixteen", "seventeen" };
+    String[] empty = new String[] {};
+
+    String unptn_locn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+    run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
+    run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
+        + ".ptned PARTITION(b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName
+        + ".ptned PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+    run("CREATE TABLE " + dbName
+        + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
+        + ".ptned WHERE b=1");
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptn_data_1);
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
+        + ".ptned WHERE b=2");
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+  }
+
+  @Test
+  public void testIncrementalInserts() throws IOException {
+    String testName = "incrementalInserts";
+    String dbName = createDB(testName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[] { "eleven", "twelve" };
+
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
+    run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data);
+
+    String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" };
+    String[] data_after_ovwrite = new String[] { "hundred" };
+    run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins);
+    run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+  }
+
+  @Test
+  public void testIncrementalInsertToPartition() throws IOException {
+    String testName = "incrementalInsertToPartition";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
+
+    String[] data_after_ovwrite = new String[] { "hundred" };
+    // Insert overwrite on existing partition
+    run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
+    // Insert overwrite on dynamic partition
+    run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite);
+  }
+
+  @Test
+  public void testInsertToMultiKeyPartition() throws IOException {
+    String testName = "insertToMultiKeyPartition";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".namelist(name string) partitioned by (year int, month int, day int) STORED AS TEXTFILE");
+    run("USE " + dbName);
+
+    String[] ptn_data_1 = new String[] { "abraham", "bob", "carter" };
+    String[] ptn_year_1980 = new String[] { "abraham", "bob" };
+    String[] ptn_day_1 = new String[] { "abraham", "carter" };
+    String[] ptn_year_1984_month_4_day_1_1 = new String[] { "carter" };
+    String[] ptn_list_1 = new String[] { "year=1980/month=4/day=1", "year=1980/month=5/day=5", "year=1984/month=4/day=1" };
+
+    run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1980,month=4,day=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1980,month=5,day=5) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1984,month=4,day=1) values('" + ptn_data_1[2] + "')");
+
+    verifySetup("SELECT name from " + dbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980);
+    verifySetup("SELECT name from " + dbName + ".namelist where (day=1) ORDER BY name", ptn_day_1);
+    verifySetup("SELECT name from " + dbName + ".namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+                                                                                ptn_year_1984_month_4_day_1_1);
+    verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_1);
+    verifySetup("SHOW PARTITIONS " + dbName + ".namelist", ptn_list_1);
+    verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1980,month=4,day=1)",
+                              "location", "namelist/year=1980/month=4/day=1");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1980) ORDER BY name", ptn_year_1980);
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist where (day=1) ORDER BY name", ptn_day_1);
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+                                                                                   ptn_year_1984_month_4_day_1_1);
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_1);
+    verifyRun("SHOW PARTITIONS " + dbName + "_dupe.namelist", ptn_list_1);
+
+    run("USE " + dbName + "_dupe");
+    verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1980,month=4,day=1)",
+            "location", "namelist/year=1980/month=4/day=1");
+    run("USE " + dbName);
+
+    String[] ptn_data_2 = new String[] { "abraham", "bob", "carter", "david", "eugene" };
+    String[] ptn_year_1984_month_4_day_1_2 = new String[] { "carter", "david" };
+    String[] ptn_day_1_2 = new String[] { "abraham", "carter", "david" };
+    String[] ptn_list_2 = new String[] { "year=1980/month=4/day=1", "year=1980/month=5/day=5",
+                                         "year=1984/month=4/day=1", "year=1990/month=5/day=25" };
+
+    run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1984,month=4,day=1) values('" + ptn_data_2[3] + "')");
+    run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1990,month=5,day=25) values('" + ptn_data_2[4] + "')");
+
+    verifySetup("SELECT name from " + dbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980);
+    verifySetup("SELECT name from " + dbName + ".namelist where (day=1) ORDER BY name", ptn_day_1_2);
+    verifySetup("SELECT name from " + dbName + ".namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+                                                                                ptn_year_1984_month_4_day_1_2);
+    verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_2);
+    verifyRun("SHOW PARTITIONS " + dbName + ".namelist", ptn_list_2);
+    verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)",
+            "location", "namelist/year=1990/month=5/day=25");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1980) ORDER BY name", ptn_year_1980);
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist where (day=1) ORDER BY name", ptn_day_1_2);
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+                                                                                   ptn_year_1984_month_4_day_1_2);
+    verifyRun("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_2);
+    verifyRun("SHOW PARTITIONS " + dbName + "_dupe.namelist", ptn_list_2);
+    run("USE " + dbName + "_dupe");
+    verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)",
+            "location", "namelist/year=1990/month=5/day=25");
+    run("USE " + dbName);
+
+    String[] ptn_data_3 = new String[] { "abraham", "bob", "carter", "david", "fisher" };
+    String[] data_after_ovwrite = new String[] { "fisher" };
+    // Insert overwrite on existing partition
+    run("INSERT OVERWRITE TABLE " + dbName + ".namelist partition(year=1990,month=5,day=25) values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT name from " + dbName + ".namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite);
+    verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_3);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifySetup("SELECT name from " + dbName + "_dupe.namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite);
+    verifySetup("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_3);
+  }
+
+  @Test
+  public void testViewsReplication() throws IOException {
+    String testName = "viewsReplication";
+    String dbName = createDB(testName);
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE VIEW " + dbName + ".virtual_view AS SELECT * FROM " + dbName + ".unptned");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    verifySetup("SELECT a from " + dbName + ".ptned", empty);
+    verifySetup("SELECT * from " + dbName + ".unptned", empty);
+    verifySetup("SELECT * from " + dbName + ".virtual_view", empty);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+    verifySetup("SELECT * from " + dbName + ".virtual_view", unptn_data);
+
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+    run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view AS SELECT a FROM " + dbName + ".ptned where b=1");
+    verifySetup("SELECT a from " + dbName + ".mat_view", ptn_data_1);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    LOG.info("Bootstrap-dump: Dumped to {} with id {}",replDumpLocn,replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+
+    run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+    verifySetup("SELECT a from " + dbName + ".virtual_view2", ptn_data_2);
+
+    // Create a view with name already exist. Just to verify if failure flow clears the added create_table event.
+    run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+
+    run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view2 AS SELECT * FROM " + dbName + ".unptned");
+    verifySetup("SELECT * from " + dbName + ".mat_view2", unptn_data);
+
+    // Perform REPL-DUMP/LOAD
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId );
+    String incrementalDumpLocn = getResult(0,0);
+    String incrementalDumpId = getResult(0,1,true);
+    LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {incrementalDumpId});
+
+    verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where b=1", ptn_data_1);
+    verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+    verifyRun("SELECT * from " + dbName + "_dupe.virtual_view2", ptn_data_2);
+    verifyRun("SELECT * from " + dbName + "_dupe.mat_view2", unptn_data);
+  }
+
+  @Test
+  public void testDumpLimit() throws IOException {
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+    String[] unptn_data = new String[] { "eleven", "thirteen", "twelve" };
+    String[] unptn_data_load1 = new String[] { "eleven" };
+    String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+    // 3 events to insert, last repl ID: replDumpId+3
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    // 3 events to insert, last repl ID: replDumpId+6
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    // 3 events to insert, last repl ID: replDumpId+9
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[2] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+
+    advanceDumpDir();
+    Integer lastReplID = Integer.valueOf(replDumpId);
+    lastReplID += 1000;
+    String toReplID = String.valueOf(lastReplID);
+
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+  }
+
+  @Test
+  public void testExchangePartition() throws IOException {
+    String testName = "exchangePartition";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".ptned_src(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_dest(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+
+    String[] empty = new String[] {};
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[2] + "')");
+
+    run("ALTER TABLE " + dbName + ".ptned_src ADD PARTITION (b=2, c=2)");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[2] + "')");
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[2] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+    // Exchange single partitions using complete partition-spec (all partition columns)
+    run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=1, c=1) WITH TABLE " + dbName + ".ptned_src");
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3)", empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+    // Exchange multiple partitions using partial partition-spec (only one partition column)
+    run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=2) WITH TABLE " + dbName + ".ptned_src");
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3)", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+  }
+
+  @Test
+  public void testTruncateTable() throws IOException {
+    String testName = "truncateTable";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[] { "eleven", "twelve" };
+    String[] empty = new String[] {};
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+
+    run("TRUNCATE TABLE " + dbName + ".unptned");
+    verifySetup("SELECT a from " + dbName + ".unptned", empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned", empty);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty);
+
+    String[] unptn_data_after_ins = new String[] { "thirteen" };
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins);
+  }
+
+  @Test
+  public void testTruncatePartitionedTable() throws IOException {
+    String testName = "truncatePartitionedTable";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+
+    String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+    String[] empty = new String[] {};
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')");
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')");
+    run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')");
+
+    verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+    run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)");
+    verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty);
+
+    run("TRUNCATE TABLE " + dbName + ".ptned_2");
+    verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty);
+    verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty);
+    verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty);
+  }
+
+  @Test
+  public void testTruncateWithCM() throws IOException {
+    String testName = "truncateWithCM";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+    String[] empty = new String[] {};
+    String[] unptn_data = new String[] { "eleven", "thirteen" };
+    String[] unptn_data_load1 = new String[] { "eleven" };
+    String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+    // 3 events to insert, last repl ID: replDumpId+3
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    // 3 events to insert, last repl ID: replDumpId+6
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+    // 1 event to truncate, last repl ID: replDumpId+8
+    run("TRUNCATE TABLE " + dbName + ".unptned");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty);
+    // 3 events to insert, last repl ID: replDumpId+11
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    // Dump and load only first insert (1 record)
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+
+    // Dump and load only second insert (2 records)
+    advanceDumpDir();
+    Integer lastReplID = Integer.valueOf(replDumpId);
+    lastReplID += 1000;
+    String toReplID = String.valueOf(lastReplID);
+
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
+
+    // Dump and load only truncate (0 records)
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 2");
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", empty);
+
+    // Dump and load insert after truncate (1 record)
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+    replDumpId = incrementalDumpId;
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+  }
+
+  @Test
+  public void testStatus() throws IOException {
+    // first test ReplStateMap functionality
+    Map<String,Long> cmap = new ReplStateMap<String,Long>();
+
+    Long oldV;
+    oldV = cmap.put("a",1L);
+    assertEquals(1L,cmap.get("a").longValue());
+    assertEquals(null,oldV);
+
+    cmap.put("b",2L);
+    oldV = cmap.put("b",-2L);
+    assertEquals(2L, cmap.get("b").longValue());
+    assertEquals(2L, oldV.longValue());
+
+    cmap.put("c",3L);
+    oldV = cmap.put("c",33L);
+    assertEquals(33L, cmap.get("c").longValue());
+    assertEquals(3L, oldV.longValue());
+
+    // Now, to actually testing status - first, we bootstrap.
+
+    String name = testName.getMethodName();
+    String dbName = createDB(name);
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String lastReplDumpLocn = getResult(0, 0);
+    String lastReplDumpId = getResult(0, 1, true);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + lastReplDumpLocn + "'");
+
+    // Bootstrap done, now on to incremental. First, we test db-level REPL LOADs.
+    // Both db-level and table-level repl.last.id must be updated.
+
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+        "CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=11)");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('blah'='foo')");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned RENAME TO  " + dbName + ".ptned_rn");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned_rn DROP PARTITION (b=11)");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, null, lastReplDumpId,
+        "DROP TABLE " + dbName + ".ptned_rn");
+
+    // DB-level REPL LOADs testing done, now moving on to table level repl loads.
+    // In each of these cases, the table-level repl.last.id must move forward, but the
+    // db-level last.repl.id must not.
+
+    String lastTblReplDumpId = lastReplDumpId;
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b int) STORED AS TEXTFILE");
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 ADD PARTITION (b=1)");
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 PARTITION (b=1) RENAME TO PARTITION (b=11)");
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 SET TBLPROPERTIES ('blah'='foo')");
+    // Note : Not testing table rename because table rename replication is not supported for table-level repl.
+    String finalTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)");
+
+    assertTrue(finalTblReplDumpId.compareTo(lastTblReplDumpId) > 0);
+
+    // TODO : currently not testing the following scenarios:
+    //   a) Multi-db wh-level REPL LOAD - need to add that
+    //   b) Insert into tables - quite a few cases need to be enumerated there, including dyn adds.
+
+  }
+
+  private static String createDB(String name) {
+    LOG.info("Testing " + name);
+    String dbName = name + "_" + tid;
+    run("CREATE DATABASE " + dbName);
+    return dbName;
+  }
+
+  @Test
+  public void testEventFilters(){
+    // Test testing that the filters introduced by EventUtils are working correctly.
+
+    // The current filters we use in ReplicationSemanticAnalyzer is as follows:
+    //    IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter(
+    //        EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern),
+    //        EventUtils.getEventBoundaryFilter(eventFrom, eventTo),
+    //        EventUtils.restrictByMessageFormat(MessageFa

<TRUNCATED>