You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/11 06:56:06 UTC
[2/4] hive git commit: HIVE-16269 : enable incremental function dump
to be loaded via repl load (Anishek Agarwal via Sushanth Sowmyan)
http://git-wip-us.apache.org/repos/asf/hive/blob/91c4fa97/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
new file mode 100644
index 0000000..05c1244
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -0,0 +1,1975 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestReplicationScenarios {
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ private final static String DBNOTIF_LISTENER_CLASSNAME =
+ "org.apache.hive.hcatalog.listener.DbNotificationListener";
+ // FIXME : replace with hive copy once that is copied
+ private final static String tid =
+ TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis();
+ private final static String TEST_PATH =
+ System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
+
+ private static HiveConf hconf;
+ private static boolean useExternalMS = false;
+ private static int msPort;
+ private static Driver driver;
+ private static HiveMetaStoreClient metaStoreClient;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+ private ArrayList<String> lastResults;
+
+ private final boolean VERIFY_SETUP_STEPS = true;
+ // if verifySetup is set to true, all the test setup we do will perform additional
+ // verifications as well, which is useful to verify that our setup occurred
+ // correctly when developing and debugging tests. These verifications, however
+ // do not test any new functionality for replication, and thus, are not relevant
+ // for testing replication itself. For steady state, we want this to be false.
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ hconf = new HiveConf(TestReplicationScenarios.class);
+ String metastoreUri = System.getProperty("test."+HiveConf.ConfVars.METASTOREURIS.varname);
+ if (metastoreUri != null) {
+ hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri);
+ useExternalMS = true;
+ return;
+ }
+
+ hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
+ DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
+ hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+ hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+ hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
+ msPort = MetaStoreUtils.startMetaStore(hconf);
+ hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
+ hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+ + msPort);
+ hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hconf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+ "false");
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+ Path testPath = new Path(TEST_PATH);
+ FileSystem fs = FileSystem.get(testPath.toUri(),hconf);
+ fs.mkdirs(testPath);
+
+ driver = new Driver(hconf);
+ SessionState.start(new CliSessionState(hconf));
+ metaStoreClient = new HiveMetaStoreClient(hconf);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass(){
+ // FIXME : should clean up TEST_PATH, but not doing it now, for debugging's sake
+ }
+
+ @Before
+ public void setUp(){
+ // before each test
+ }
+
+ @After
+ public void tearDown(){
+ // after each test
+ }
+
+ private static int next = 0;
+ private synchronized void advanceDumpDir() {
+ next++;
+ ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+ }
+
+ static class Tuple {
+ final String replicatedDbName;
+ final String lastReplicationId;
+
+ Tuple(String replicatedDbName, String lastReplicationId) {
+ this.replicatedDbName = replicatedDbName;
+ this.lastReplicationId = lastReplicationId;
+ }
+ }
+
+ private Tuple loadAndVerify(String dbName) throws IOException {
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String dumpLocation = getResult(0, 0);
+ String lastReplicationId = getResult(0, 1, true);
+ String replicatedDbName = dbName + "_replicated";
+ run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ printOutput();
+ run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId);
+ return new Tuple(replicatedDbName, lastReplicationId);
+ }
+
+ /**
+ * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
+ * Inserts data into one of the ptned tables, and one of the unptned tables,
+ * and verifies that a REPL DUMP followed by a REPL LOAD is able to load it
+ * appropriately. This tests bootstrap behaviour primarily.
+ */
+ @Test
+ public void testBasic() throws IOException {
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
+ verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
+
+ String replicatedDbName = loadAndVerify(dbName).replicatedDbName;
+
+ verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data);
+ verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + ".ptned_empty", empty);
+ verifyRun("SELECT * from " + dbName + ".unptned_empty", empty);
+ }
+
+ @Test
+ public void testBasicWithCM() throws Exception {
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+ String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+ createTestDataFile(ptn_locn_2_later, ptn_data_2_later);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned");
+ verifyResults(unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=2");
+ verifyResults(ptn_data_2);
+ run("SELECT a from " + dbName + ".ptned_empty");
+ verifyResults(empty);
+ run("SELECT * from " + dbName + ".unptned_empty");
+ verifyResults(empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+
+ // Table dropped after "repl dump"
+ run("DROP TABLE " + dbName + ".unptned");
+ // Partition droppped after "repl dump"
+ run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)");
+ // File changed after "repl dump"
+ Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2");
+ Path loc = new Path(p.getSd().getLocation());
+ FileSystem fs = loc.getFileSystem(hconf);
+ Path file = fs.listStatus(loc)[0].getPath();
+ fs.delete(file, false);
+ fs.copyFromLocalFile(new Path(ptn_locn_2_later), file);
+
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {replDumpId});
+
+ run("SELECT * from " + dbName + "_dupe.unptned");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1");
+ verifyResults(ptn_data_1);
+ // Since partition(b=2) changed manually, Hive cannot find
+ // it in original location and cmroot, thus empty
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + ".ptned_empty");
+ verifyResults(empty);
+ run("SELECT * from " + dbName + ".unptned_empty");
+ verifyResults(empty);
+ }
+
+ @Test
+ public void testBootstrapLoadOnExistingDb() throws IOException {
+ String testName = "bootstrapLoadOnExistingDb";
+ LOG.info("Testing "+testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ createTestDataFile(unptn_locn, unptn_data);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ // Create an empty database to load
+ run("CREATE DATABASE " + dbName + "_empty");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ // Load to an empty database
+ run("REPL LOAD " + dbName + "_empty FROM '" + replDumpLocn + "'");
+
+ // REPL STATUS should return same repl ID as dump
+ verifyRun("REPL STATUS " + dbName + "_empty", replDumpId);
+ verifyRun("SELECT * from " + dbName + "_empty.unptned", unptn_data);
+
+ String[] nullReplId = new String[]{ "NULL" };
+
+ // Create a database with a table
+ run("CREATE DATABASE " + dbName + "_withtable");
+ run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE");
+ // Load using same dump to a DB with table. It should fail as DB is not empty.
+ verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'");
+
+ // REPL STATUS should return NULL
+ verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId);
+
+ // Create a database with a view
+ run("CREATE DATABASE " + dbName + "_withview");
+ run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE");
+ run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned");
+ // Load using same dump to a DB with view. It should fail as DB is not empty.
+ verifyFail("REPL LOAD " + dbName + "_withview FROM '" + replDumpLocn + "'");
+
+ // REPL STATUS should return NULL
+ verifyRun("REPL STATUS " + dbName + "_withview", nullReplId);
+ }
+
+ @Test
+ public void testIncrementalAdds() throws IOException {
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
+
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
+ verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
+
+ // Now, we load data into the tables, and see if an incremental
+ // repl drop/load can duplicate it.
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+ run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data);
+
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+ run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1",ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2);
+
+ // Perform REPL-DUMP/LOAD
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId );
+ String incrementalDumpLocn = getResult(0,0);
+ String incrementalDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {incrementalDumpId});
+
+ // VERIFY tables and partitions on destination for equivalence.
+
+ verifyRun("SELECT * from " + dbName + "_dupe.unptned_empty", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_empty", empty);
+
+// verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+ // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
+ // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
+ // fixing that.
+ verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
+
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+ }
+
+ @Test
+ public void testDrops() throws IOException {
+
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned3(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+ verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+ verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=1", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2);
+
+ // At this point, we've set up all the tables and ptns we're going to test drops across
+ // Replicate it first, and then we'll drop it on the source.
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId});
+
+ verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=2", ptn_data_2);
+
+ // All tables good on destination, drop on source.
+
+ run("DROP TABLE " + dbName + ".unptned");
+ run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+ run("DROP TABLE " + dbName + ".ptned2");
+ run("ALTER TABLE " + dbName + ".ptned3 DROP PARTITION (b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned3 WHERE b=1",empty);
+ verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2);
+
+ // replicate the incremental drops
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String postDropReplDumpLocn = getResult(0,0);
+ String postDropReplDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+ // verify that drops were replicated. This can either be from tables or ptns
+ // not existing, and thus, throwing a NoSuchObjectException, or returning nulls
+ // or select * returning empty, depending on what we're testing.
+
+ Exception e = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+ assertNull(tbl);
+ } catch (TException te) {
+ e = te;
+ }
+ assertNotNull(e);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned3", ptn_data_2);
+
+ Exception e2 = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+ assertNull(tbl);
+ } catch (TException te) {
+ e2 = te;
+ }
+ assertNotNull(e2);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+ }
+
+ @Test
+ public void testDropsWithCM() throws IOException {
+
+ String testName = "drops_with_cm";
+ String dbName = createDB(testName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned");
+ verifyResults(unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+ run("SELECT a from " + dbName + ".ptned WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+ run("SELECT a from " + dbName + ".ptned WHERE b='2'");
+ verifyResults(ptn_data_2);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+ run("SELECT a from " + dbName + ".ptned2 WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+ run("SELECT a from " + dbName + ".ptned2 WHERE b='2'");
+ verifyResults(ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {replDumpId});
+
+ run("SELECT * from " + dbName + "_dupe.unptned");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'");
+ verifyResults(ptn_data_2);
+ run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'");
+ verifyResults(ptn_data_2);
+
+ run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned");
+ run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned");
+ run("INSERT INTO TABLE " + dbName + ".ptned_copy" + " PARTITION(b='1') SELECT a FROM " +
+ dbName + ".ptned WHERE b='1'");
+ run("SELECT a from " + dbName + ".unptned_copy");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + ".ptned_copy");
+ verifyResults(ptn_data_1);
+
+ run("DROP TABLE " + dbName + ".unptned");
+ run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+ run("DROP TABLE " + dbName + ".ptned2");
+ run("SELECT a from " + dbName + ".ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + ".ptned");
+ verifyResults(ptn_data_1);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String postDropReplDumpLocn = getResult(0,0);
+ String postDropReplDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+
+ // Drop table after dump
+ run("DROP TABLE " + dbName + ".unptned_copy");
+ // Drop partition after dump
+ run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')");
+
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+ Exception e = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+ assertNull(tbl);
+ } catch (TException te) {
+ e = te;
+ }
+ assertNotNull(e);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + "_dupe.ptned");
+ verifyResults(ptn_data_1);
+
+ Exception e2 = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+ assertNull(tbl);
+ } catch (TException te) {
+ e2 = te;
+ }
+ assertNotNull(e2);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+
+ run("SELECT a from " + dbName + "_dupe.unptned_copy");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + "_dupe.ptned_copy");
+ verifyResults(ptn_data_1);
+ }
+
+ @Test
+ public void testAlters() throws IOException {
+
+ String testName = "alters";
+ String dbName = createDB(testName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned2");
+ verifySetup("SELECT * from " + dbName + ".unptned2", unptn_data);
+
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+ verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'",ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+ verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2);
+
+ // base tables set up, let's replicate them over
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {replDumpId});
+
+ verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+ verifySetup("SELECT * from " + dbName + "_dupe.unptned2", unptn_data);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2);
+
+ // tables have been replicated over, and verified to be identical. Now, we do a couple of
+ // alters on the source
+
+ // Rename unpartitioned table
+ run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_rn");
+ verifySetup("SELECT * from " + dbName + ".unptned_rn", unptn_data);
+
+ // Alter unpartitioned table set table property
+ String testKey = "blah";
+ String testVal = "foo";
+ run("ALTER TABLE " + dbName + ".unptned2 SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')");
+ if (VERIFY_SETUP_STEPS){
+ try {
+ Table unptn2 = metaStoreClient.getTable(dbName,"unptned2");
+ assertTrue(unptn2.getParameters().containsKey(testKey));
+ assertEquals(testVal,unptn2.getParameters().get(testKey));
+ } catch (TException e) {
+ assertNull(e);
+ }
+ }
+
+ // alter partitioned table, rename partition
+ run("ALTER TABLE " + dbName + ".ptned PARTITION (b='2') RENAME TO PARTITION (b='22')");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=22", ptn_data_2);
+
+ // alter partitioned table set table property
+ run("ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')");
+ if (VERIFY_SETUP_STEPS){
+ try {
+ Table ptned = metaStoreClient.getTable(dbName,"ptned");
+ assertTrue(ptned.getParameters().containsKey(testKey));
+ assertEquals(testVal,ptned.getParameters().get(testKey));
+ } catch (TException e) {
+ assertNull(e);
+ }
+ }
+
+ // alter partitioned table's partition set partition property
+ // Note : No DDL way to alter a partition, so we use the MSC api directly.
+ try {
+ List<String> ptnVals1 = new ArrayList<String>();
+ ptnVals1.add("1");
+ Partition ptn1 = metaStoreClient.getPartition(dbName, "ptned", ptnVals1);
+ ptn1.getParameters().put(testKey,testVal);
+ metaStoreClient.alter_partition(dbName,"ptned",ptn1,null);
+ } catch (TException e) {
+ assertNull(e);
+ }
+
+ // rename partitioned table
+ verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2);
+ run("ALTER TABLE " + dbName + ".ptned2 RENAME TO " + dbName + ".ptned2_rn");
+ verifySetup("SELECT a from " + dbName + ".ptned2_rn WHERE b=2", ptn_data_2);
+
+ // All alters done, now we replicate them over.
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String postAlterReplDumpLocn = getResult(0,0);
+ String postAlterReplDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}->{}", postAlterReplDumpLocn, replDumpId, postAlterReplDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'");
+
+ // Replication done, we now do the following verifications:
+
+ // verify that unpartitioned table rename succeeded.
+ Exception e = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "unptned");
+ assertNull(tbl);
+ } catch (TException te) {
+ e = te;
+ }
+ assertNotNull(e);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+ verifyRun("SELECT * from " + dbName + "_dupe.unptned_rn", unptn_data);
+
+ // verify that partition rename succeded.
+ try {
+ Table unptn2 = metaStoreClient.getTable(dbName + "_dupe" , "unptned2");
+ assertTrue(unptn2.getParameters().containsKey(testKey));
+ assertEquals(testVal,unptn2.getParameters().get(testKey));
+ } catch (TException te) {
+ assertNull(te);
+ }
+
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=22", ptn_data_2);
+
+ // verify that ptned table rename succeded.
+ Exception e2 = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "ptned2");
+ assertNull(tbl);
+ } catch (TException te) {
+ e2 = te;
+ }
+ assertNotNull(e2);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2", ptn_data_2);
+
+ // verify that ptned table property set worked
+ try {
+ Table ptned = metaStoreClient.getTable(dbName + "_dupe" , "ptned");
+ assertTrue(ptned.getParameters().containsKey(testKey));
+ assertEquals(testVal, ptned.getParameters().get(testKey));
+ } catch (TException te) {
+ assertNull(te);
+ }
+
+ // verify that partitioned table partition property set worked.
+ try {
+ List<String> ptnVals1 = new ArrayList<String>();
+ ptnVals1.add("1");
+ Partition ptn1 = metaStoreClient.getPartition(dbName + "_dupe", "ptned", ptnVals1);
+ assertTrue(ptn1.getParameters().containsKey(testKey));
+ assertEquals(testVal,ptn1.getParameters().get(testKey));
+ } catch (TException te) {
+ assertNull(te);
+ }
+
+ }
+
+ @Test
+ public void testIncrementalLoad() throws IOException {
+ String testName = "incrementalLoad";
+ String dbName = createDB(testName);
+
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName
+ + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[] { "eleven", "twelve" };
+ String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "sixteen", "seventeen" };
+ String[] empty = new String[] {};
+
+ String unptn_locn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
+ verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+ run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
+ run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
+
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
+ + ".ptned PARTITION(b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName
+ + ".ptned PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+ run("CREATE TABLE " + dbName
+ + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+ run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
+ + ".ptned WHERE b=1");
+ verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptn_data_1);
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
+ + ".ptned WHERE b=2");
+ verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+ }
+
+ @Test
+ public void testIncrementalInserts() throws IOException {
+ String testName = "incrementalInserts";
+ String dbName = createDB(testName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[] { "eleven", "twelve" };
+
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
+ run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data);
+
+ String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" };
+ String[] data_after_ovwrite = new String[] { "hundred" };
+ run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins);
+ run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+ }
+
+ @Test
+ public void testIncrementalInsertToPartition() throws IOException {
+ String testName = "incrementalInsertToPartition";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
+
+ String[] data_after_ovwrite = new String[] { "hundred" };
+ // Insert overwrite on existing partition
+ run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
+ // Insert overwrite on dynamic partition
+ run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite);
+ }
+
+ @Test
+ public void testInsertToMultiKeyPartition() throws IOException {
+ String testName = "insertToMultiKeyPartition";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".namelist(name string) partitioned by (year int, month int, day int) STORED AS TEXTFILE");
+ run("USE " + dbName);
+
+ String[] ptn_data_1 = new String[] { "abraham", "bob", "carter" };
+ String[] ptn_year_1980 = new String[] { "abraham", "bob" };
+ String[] ptn_day_1 = new String[] { "abraham", "carter" };
+ String[] ptn_year_1984_month_4_day_1_1 = new String[] { "carter" };
+ String[] ptn_list_1 = new String[] { "year=1980/month=4/day=1", "year=1980/month=5/day=5", "year=1984/month=4/day=1" };
+
+ run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1980,month=4,day=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1980,month=5,day=5) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1984,month=4,day=1) values('" + ptn_data_1[2] + "')");
+
+ verifySetup("SELECT name from " + dbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980);
+ verifySetup("SELECT name from " + dbName + ".namelist where (day=1) ORDER BY name", ptn_day_1);
+ verifySetup("SELECT name from " + dbName + ".namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+ ptn_year_1984_month_4_day_1_1);
+ verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_1);
+ verifySetup("SHOW PARTITIONS " + dbName + ".namelist", ptn_list_1);
+ verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1980,month=4,day=1)",
+ "location", "namelist/year=1980/month=4/day=1");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1980) ORDER BY name", ptn_year_1980);
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist where (day=1) ORDER BY name", ptn_day_1);
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+ ptn_year_1984_month_4_day_1_1);
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_1);
+ verifyRun("SHOW PARTITIONS " + dbName + "_dupe.namelist", ptn_list_1);
+
+ run("USE " + dbName + "_dupe");
+ verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1980,month=4,day=1)",
+ "location", "namelist/year=1980/month=4/day=1");
+ run("USE " + dbName);
+
+ String[] ptn_data_2 = new String[] { "abraham", "bob", "carter", "david", "eugene" };
+ String[] ptn_year_1984_month_4_day_1_2 = new String[] { "carter", "david" };
+ String[] ptn_day_1_2 = new String[] { "abraham", "carter", "david" };
+ String[] ptn_list_2 = new String[] { "year=1980/month=4/day=1", "year=1980/month=5/day=5",
+ "year=1984/month=4/day=1", "year=1990/month=5/day=25" };
+
+ run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1984,month=4,day=1) values('" + ptn_data_2[3] + "')");
+ run("INSERT INTO TABLE " + dbName + ".namelist partition(year=1990,month=5,day=25) values('" + ptn_data_2[4] + "')");
+
+ verifySetup("SELECT name from " + dbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980);
+ verifySetup("SELECT name from " + dbName + ".namelist where (day=1) ORDER BY name", ptn_day_1_2);
+ verifySetup("SELECT name from " + dbName + ".namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+ ptn_year_1984_month_4_day_1_2);
+ verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_2);
+ verifyRun("SHOW PARTITIONS " + dbName + ".namelist", ptn_list_2);
+ verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)",
+ "location", "namelist/year=1990/month=5/day=25");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1980) ORDER BY name", ptn_year_1980);
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist where (day=1) ORDER BY name", ptn_day_1_2);
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1984 and month=4 and day=1) ORDER BY name",
+ ptn_year_1984_month_4_day_1_2);
+ verifyRun("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_2);
+ verifyRun("SHOW PARTITIONS " + dbName + "_dupe.namelist", ptn_list_2);
+ run("USE " + dbName + "_dupe");
+ verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)",
+ "location", "namelist/year=1990/month=5/day=25");
+ run("USE " + dbName);
+
+ String[] ptn_data_3 = new String[] { "abraham", "bob", "carter", "david", "fisher" };
+ String[] data_after_ovwrite = new String[] { "fisher" };
+ // Insert overwrite on existing partition
+ run("INSERT OVERWRITE TABLE " + dbName + ".namelist partition(year=1990,month=5,day=25) values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT name from " + dbName + ".namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite);
+ verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_3);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifySetup("SELECT name from " + dbName + "_dupe.namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite);
+ verifySetup("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_3);
+ }
+
+ @Test
+ public void testViewsReplication() throws IOException {
+ String testName = "viewsReplication";
+ String dbName = createDB(testName);
+
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE VIEW " + dbName + ".virtual_view AS SELECT * FROM " + dbName + ".unptned");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ verifySetup("SELECT a from " + dbName + ".ptned", empty);
+ verifySetup("SELECT * from " + dbName + ".unptned", empty);
+ verifySetup("SELECT * from " + dbName + ".virtual_view", empty);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+ verifySetup("SELECT * from " + dbName + ".virtual_view", unptn_data);
+
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+ run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view AS SELECT a FROM " + dbName + ".ptned where b=1");
+ verifySetup("SELECT a from " + dbName + ".mat_view", ptn_data_1);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ LOG.info("Bootstrap-dump: Dumped to {} with id {}",replDumpLocn,replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+
+ run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+ verifySetup("SELECT a from " + dbName + ".virtual_view2", ptn_data_2);
+
+ // Create a view with name already exist. Just to verify if failure flow clears the added create_table event.
+ run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+
+ run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view2 AS SELECT * FROM " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".mat_view2", unptn_data);
+
+ // Perform REPL-DUMP/LOAD
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId );
+ String incrementalDumpLocn = getResult(0,0);
+ String incrementalDumpId = getResult(0,1,true);
+ LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {incrementalDumpId});
+
+ verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where b=1", ptn_data_1);
+ verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+ verifyRun("SELECT * from " + dbName + "_dupe.virtual_view2", ptn_data_2);
+ verifyRun("SELECT * from " + dbName + "_dupe.mat_view2", unptn_data);
+ }
+
+ @Test
+ public void testDumpLimit() throws IOException {
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+ String[] unptn_data = new String[] { "eleven", "thirteen", "twelve" };
+ String[] unptn_data_load1 = new String[] { "eleven" };
+ String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+ // 3 events to insert, last repl ID: replDumpId+3
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ // 3 events to insert, last repl ID: replDumpId+6
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ // 3 events to insert, last repl ID: replDumpId+9
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[2] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+
+ advanceDumpDir();
+ Integer lastReplID = Integer.valueOf(replDumpId);
+ lastReplID += 1000;
+ String toReplID = String.valueOf(lastReplID);
+
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+ }
+
+ @Test
+ public void testExchangePartition() throws IOException {
+ String testName = "exchangePartition";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".ptned_src(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_dest(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+
+ String[] empty = new String[] {};
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[2] + "')");
+
+ run("ALTER TABLE " + dbName + ".ptned_src ADD PARTITION (b=2, c=2)");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[2] + "')");
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[2] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+ // Exchange single partitions using complete partition-spec (all partition columns)
+ run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=1, c=1) WITH TABLE " + dbName + ".ptned_src");
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3)", empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+ // Exchange multiple partitions using partial partition-spec (only one partition column)
+ run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=2) WITH TABLE " + dbName + ".ptned_src");
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ }
+
+ @Test
+ public void testTruncateTable() throws IOException {
+ String testName = "truncateTable";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[] { "eleven", "twelve" };
+ String[] empty = new String[] {};
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+
+ run("TRUNCATE TABLE " + dbName + ".unptned");
+ verifySetup("SELECT a from " + dbName + ".unptned", empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty);
+
+ String[] unptn_data_after_ins = new String[] { "thirteen" };
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins);
+ }
+
+ @Test
+ public void testTruncatePartitionedTable() throws IOException {
+ String testName = "truncatePartitionedTable";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+ String[] empty = new String[] {};
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')");
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')");
+
+ verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+ run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty);
+
+ run("TRUNCATE TABLE " + dbName + ".ptned_2");
+ verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty);
+ }
+
+ @Test
+ public void testTruncateWithCM() throws IOException {
+ String testName = "truncateWithCM";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+ String[] empty = new String[] {};
+ String[] unptn_data = new String[] { "eleven", "thirteen" };
+ String[] unptn_data_load1 = new String[] { "eleven" };
+ String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+ // 3 events to insert, last repl ID: replDumpId+3
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ // 3 events to insert, last repl ID: replDumpId+6
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ // 1 event to truncate, last repl ID: replDumpId+8
+ run("TRUNCATE TABLE " + dbName + ".unptned");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty);
+ // 3 events to insert, last repl ID: replDumpId+11
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ // Dump and load only first insert (1 record)
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+
+ // Dump and load only second insert (2 records)
+ advanceDumpDir();
+ Integer lastReplID = Integer.valueOf(replDumpId);
+ lastReplID += 1000;
+ String toReplID = String.valueOf(lastReplID);
+
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
+
+ // Dump and load only truncate (0 records)
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 2");
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", empty);
+
+ // Dump and load insert after truncate (1 record)
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+ }
+
+ @Test
+ public void testStatus() throws IOException {
+ // first test ReplStateMap functionality
+ Map<String,Long> cmap = new ReplStateMap<String,Long>();
+
+ Long oldV;
+ oldV = cmap.put("a",1L);
+ assertEquals(1L,cmap.get("a").longValue());
+ assertEquals(null,oldV);
+
+ cmap.put("b",2L);
+ oldV = cmap.put("b",-2L);
+ assertEquals(2L, cmap.get("b").longValue());
+ assertEquals(2L, oldV.longValue());
+
+ cmap.put("c",3L);
+ oldV = cmap.put("c",33L);
+ assertEquals(33L, cmap.get("c").longValue());
+ assertEquals(3L, oldV.longValue());
+
+ // Now, to actually testing status - first, we bootstrap.
+
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String lastReplDumpLocn = getResult(0, 0);
+ String lastReplDumpId = getResult(0, 1, true);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + lastReplDumpLocn + "'");
+
+ // Bootstrap done, now on to incremental. First, we test db-level REPL LOADs.
+ // Both db-level and table-level repl.last.id must be updated.
+
+ lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+ "CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
+ lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=11)");
+ lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('blah'='foo')");
+ lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_rn");
+ lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned_rn DROP PARTITION (b=11)");
+ lastReplDumpId = verifyAndReturnDbReplStatus(dbName, null, lastReplDumpId,
+ "DROP TABLE " + dbName + ".ptned_rn");
+
+ // DB-level REPL LOADs testing done, now moving on to table level repl loads.
+ // In each of these cases, the table-level repl.last.id must move forward, but the
+ // db-level last.repl.id must not.
+
+ String lastTblReplDumpId = lastReplDumpId;
+ lastTblReplDumpId = verifyAndReturnTblReplStatus(
+ dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+ "CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b int) STORED AS TEXTFILE");
+ lastTblReplDumpId = verifyAndReturnTblReplStatus(
+ dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned2 ADD PARTITION (b=1)");
+ lastTblReplDumpId = verifyAndReturnTblReplStatus(
+ dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned2 PARTITION (b=1) RENAME TO PARTITION (b=11)");
+ lastTblReplDumpId = verifyAndReturnTblReplStatus(
+ dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned2 SET TBLPROPERTIES ('blah'='foo')");
+ // Note : Not testing table rename because table rename replication is not supported for table-level repl.
+ String finalTblReplDumpId = verifyAndReturnTblReplStatus(
+ dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+ "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)");
+
+ assertTrue(finalTblReplDumpId.compareTo(lastTblReplDumpId) > 0);
+
+ // TODO : currently not testing the following scenarios:
+ // a) Multi-db wh-level REPL LOAD - need to add that
+ // b) Insert into tables - quite a few cases need to be enumerated there, including dyn adds.
+
+ }
+
+ private static String createDB(String name) {
+ LOG.info("Testing " + name);
+ String dbName = name + "_" + tid;
+ run("CREATE DATABASE " + dbName);
+ return dbName;
+ }
+
+ @Test
+ public void testEventFilters(){
+ // Test testing that the filters introduced by EventUtils are working correctly.
+
+ // The current filters we use in ReplicationSemanticAnalyzer is as follows:
+ // IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter(
+ // EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern),
+ // EventUtils.getEventBoundaryFilter(eventFrom, eventTo),
+ // EventUtils.restrictByMessageFormat(MessageFa
<TRUNCATED>