You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:22 UTC
[35/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index c26a075..5173d8b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -28,20 +28,23 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.util.Shell;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,17 +64,22 @@ import static org.junit.Assert.assertNull;
public class TestReplicationScenarios {
- final static String DBNOTIF_LISTENER_CLASSNAME = "org.apache.hive.hcatalog.listener.DbNotificationListener";
+ @Rule
+ public final TestName testName = new TestName();
+
+ private final static String DBNOTIF_LISTENER_CLASSNAME =
+ "org.apache.hive.hcatalog.listener.DbNotificationListener";
// FIXME : replace with hive copy once that is copied
- final static String tid =
+ private final static String tid =
TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis();
- final static String TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid;
+ private final static String TEST_PATH =
+ System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
- static HiveConf hconf;
- static boolean useExternalMS = false;
- static int msPort;
- static Driver driver;
- static HiveMetaStoreClient metaStoreClient;
+ private static HiveConf hconf;
+ private static boolean useExternalMS = false;
+ private static int msPort;
+ private static Driver driver;
+ private static HiveMetaStoreClient metaStoreClient;
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private ArrayList<String> lastResults;
@@ -93,7 +101,7 @@ public class TestReplicationScenarios {
return;
}
- hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
+ hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -140,6 +148,32 @@ public class TestReplicationScenarios {
ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
}
+ @Test
+ public void testFunctionReplicationAsPartOfBootstrap() throws IOException {
+ String dbName = createDB(testName.getMethodName());
+ run("CREATE FUNCTION " + dbName
+ + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' "
+ + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'");
+
+ String replicatedDbName = loadAndVerify(dbName);
+ run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'");
+ verifyResults(new String[] { replicatedDbName + ".testFunction" });
+ }
+
+ private String loadAndVerify(String dbName) throws IOException {
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String dumpLocation = getResult(0, 0);
+ String lastReplicationId = getResult(0, 1, true);
+ String replicatedDbName = dbName + "_replicated";
+ run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ printOutput();
+ run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId);
+ return replicatedDbName;
+ }
+
+
/**
* Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
* Inserts data into one of the ptned tables, and one of the unptned tables,
@@ -148,12 +182,8 @@ public class TestReplicationScenarios {
*/
@Test
public void testBasic() throws IOException {
-
- String testName = "basic";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -164,9 +194,9 @@ public class TestReplicationScenarios {
String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -181,31 +211,19 @@ public class TestReplicationScenarios {
verifySetup("SELECT a from " + dbName + ".ptned_empty", empty);
verifySetup("SELECT * from " + dbName + ".unptned_empty", empty);
- advanceDumpDir();
- run("REPL DUMP " + dbName);
- String replDumpLocn = getResult(0,0);
- String replDumpId = getResult(0,1,true);
- run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
- printOutput();
- run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
-
- verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId);
+ String replicatedDbName = loadAndVerify(dbName);
- verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
- verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
- verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
+ verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data);
+ verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2);
verifyRun("SELECT a from " + dbName + ".ptned_empty", empty);
verifyRun("SELECT * from " + dbName + ".unptned_empty", empty);
}
@Test
public void testBasicWithCM() throws Exception {
-
- String testName = "basic_with_cm";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
@@ -217,10 +235,10 @@ public class TestReplicationScenarios {
String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
- String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+ String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -280,12 +298,61 @@ public class TestReplicationScenarios {
}
@Test
- public void testIncrementalAdds() throws IOException {
- String testName = "incrementalAdds";
+ public void testBootstrapLoadOnExistingDb() throws IOException {
+ String testName = "bootstrapLoadOnExistingDb";
LOG.info("Testing "+testName);
String dbName = testName + "_" + tid;
run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ createTestDataFile(unptn_locn, unptn_data);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ // Create an empty database to load
+ run("CREATE DATABASE " + dbName + "_empty");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ // Load to an empty database
+ run("REPL LOAD " + dbName + "_empty FROM '" + replDumpLocn + "'");
+
+ // REPL STATUS should return same repl ID as dump
+ verifyRun("REPL STATUS " + dbName + "_empty", replDumpId);
+ verifyRun("SELECT * from " + dbName + "_empty.unptned", unptn_data);
+
+ String[] nullReplId = new String[]{ "NULL" };
+
+ // Create a database with a table
+ run("CREATE DATABASE " + dbName + "_withtable");
+ run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE");
+ // Load using same dump to a DB with table. It should fail as DB is not empty.
+ verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'");
+
+ // REPL STATUS should return NULL
+ verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId);
+
+ // Create a database with a view
+ run("CREATE DATABASE " + dbName + "_withview");
+ run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE");
+ run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned");
+ // Load using same dump to a DB with view. It should fail as DB is not empty.
+ verifyFail("REPL LOAD " + dbName + "_withview FROM '" + replDumpLocn + "'");
+
+ // REPL STATUS should return NULL
+ verifyRun("REPL STATUS " + dbName + "_withview", nullReplId);
+ }
+
+ @Test
+ public void testIncrementalAdds() throws IOException {
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -304,9 +371,9 @@ public class TestReplicationScenarios {
String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -368,11 +435,8 @@ public class TestReplicationScenarios {
@Test
public void testDrops() throws IOException {
- String testName = "drops";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -383,9 +447,9 @@ public class TestReplicationScenarios {
String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
String[] empty = new String[]{};
- String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
- String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
- String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+ String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
@@ -482,10 +546,7 @@ public class TestReplicationScenarios {
public void testDropsWithCM() throws IOException {
String testName = "drops_with_cm";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -608,10 +669,7 @@ public class TestReplicationScenarios {
public void testAlters() throws IOException {
String testName = "alters";
- LOG.info("Testing "+testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
@@ -793,10 +851,7 @@ public class TestReplicationScenarios {
@Test
public void testIncrementalLoad() throws IOException {
String testName = "incrementalLoad";
- LOG.info("Testing " + testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
@@ -844,6 +899,7 @@ public class TestReplicationScenarios {
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
+ ".ptned PARTITION(b=1)");
verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
@@ -873,15 +929,14 @@ public class TestReplicationScenarios {
verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
}
@Test
public void testIncrementalInserts() throws IOException {
String testName = "incrementalInserts";
- LOG.info("Testing " + testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String dbName = createDB(testName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
advanceDumpDir();
@@ -892,13 +947,14 @@ public class TestReplicationScenarios {
run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
String[] unptn_data = new String[] { "eleven", "twelve" };
+
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
- verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
+ verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
- verifyRun("SELECT * from " + dbName + ".unptned_late", unptn_data);
+ verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data);
advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -909,14 +965,227 @@ public class TestReplicationScenarios {
run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
- verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
- verifyRun("SELECT a from " + dbName + ".unptned_late", unptn_data);
- verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data);
- verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data);
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data);
+
+ String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" };
+ String[] data_after_ovwrite = new String[] { "hundred" };
+ run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins);
+ run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+ }
+
+ @Test
+ public void testIncrementalInsertToPartition() throws IOException {
+ String testName = "incrementalInsertToPartition";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[2] + "')");
+
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[2] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
+
+ String[] data_after_ovwrite = new String[] { "hundred" };
+ // Insert overwrite on existing partition
+ run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
+ // Insert overwrite on dynamic partition
+ run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite);
+ }
+
+ @Test
+ public void testViewsReplication() throws IOException {
+ String testName = "viewsReplication";
+ String dbName = createDB(testName);
+
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE VIEW " + dbName + ".virtual_view AS SELECT * FROM " + dbName + ".unptned");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ verifySetup("SELECT a from " + dbName + ".ptned", empty);
+ verifySetup("SELECT * from " + dbName + ".unptned", empty);
+ verifySetup("SELECT * from " + dbName + ".virtual_view", empty);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".unptned", unptn_data);
+ verifySetup("SELECT * from " + dbName + ".virtual_view", unptn_data);
+
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2);
+
+ run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view AS SELECT a FROM " + dbName + ".ptned where b=1");
+ verifySetup("SELECT a from " + dbName + ".mat_view", ptn_data_1);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ LOG.info("Bootstrap-dump: Dumped to {} with id {}",replDumpLocn,replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+
+ run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+ verifySetup("SELECT a from " + dbName + ".virtual_view2", ptn_data_2);
+
+ // Create a view with name already exist. Just to verify if failure flow clears the added create_table event.
+ run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2");
+
+ run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view2 AS SELECT * FROM " + dbName + ".unptned");
+ verifySetup("SELECT * from " + dbName + ".mat_view2", unptn_data);
+
+ // Perform REPL-DUMP/LOAD
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId );
+ String incrementalDumpLocn = getResult(0,0);
+ String incrementalDumpId = getResult(0,1,true);
+ LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {incrementalDumpId});
+
+ verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where b=1", ptn_data_1);
+ verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1);
+ verifyRun("SELECT * from " + dbName + "_dupe.virtual_view2", ptn_data_2);
+ verifyRun("SELECT * from " + dbName + "_dupe.mat_view2", unptn_data);
+ }
+
+ @Test
+ public void testDumpLimit() throws IOException {
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+ String[] unptn_data = new String[] { "eleven", "thirteen", "twelve" };
+ String[] unptn_data_load1 = new String[] { "eleven" };
+ String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+ // 3 events to insert, last repl ID: replDumpId+3
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ // 3 events to insert, last repl ID: replDumpId+6
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ // 3 events to insert, last repl ID: replDumpId+9
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[2] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
- String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen" };
- run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[2] + "')");
- verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins);
+ advanceDumpDir();
+ Integer lastReplID = Integer.valueOf(replDumpId);
+ lastReplID += 1000;
+ String toReplID = String.valueOf(lastReplID);
+
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -924,11 +1193,312 @@ public class TestReplicationScenarios {
incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
+ }
+
+ @Test
+ public void testExchangePartition() throws IOException {
+ String testName = "exchangePartition";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".ptned_src(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_dest(a string) partitioned by (b int, c int) STORED AS TEXTFILE");
+
+ String[] empty = new String[] {};
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=1, c=1) values('" + ptn_data_1[2] + "')");
+
+ run("ALTER TABLE " + dbName + ".ptned_src ADD PARTITION (b=2, c=2)");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=2) values('" + ptn_data_2[2] + "')");
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_src partition(b=2, c=3) values('" + ptn_data_2[2] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+ // Exchange single partitions using complete partition-spec (all partition columns)
+ run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=1, c=1) WITH TABLE " + dbName + ".ptned_src");
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3)", empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty);
+
+ // Exchange multiple partitions using partial partition-spec (only one partition column)
+ run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=2) WITH TABLE " + dbName + ".ptned_src");
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3)", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2);
+ }
+
+ @Test
+ public void testTruncateTable() throws IOException {
+ String testName = "truncateTable";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[] { "eleven", "twelve" };
+ String[] empty = new String[] {};
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
- verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins);
+ run("TRUNCATE TABLE " + dbName + ".unptned");
+ verifySetup("SELECT a from " + dbName + ".unptned", empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned", empty);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty);
+
+ String[] unptn_data_after_ins = new String[] { "thirteen" };
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins);
+ }
+
+ @Test
+ public void testTruncatePartitionedTable() throws IOException {
+ String testName = "truncatePartitionedTable";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+
+ String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
+ String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
+ String[] empty = new String[] {};
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')");
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')");
+ run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')");
+
+ verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2);
+
+ run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)");
+ verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty);
+
+ run("TRUNCATE TABLE " + dbName + ".ptned_2");
+ verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty);
+ verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty);
+ verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty);
+ }
+
+ @Test
+ public void testTruncateWithCM() throws IOException {
+ String testName = "truncateWithCM";
+ LOG.info("Testing " + testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0, 0);
+ String replDumpId = getResult(0, 1, true);
+ LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+
+ String[] empty = new String[] {};
+ String[] unptn_data = new String[] { "eleven", "thirteen" };
+ String[] unptn_data_load1 = new String[] { "eleven" };
+ String[] unptn_data_load2 = new String[] { "eleven", "thirteen" };
+
+ // 3 events to insert, last repl ID: replDumpId+3
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+ // 3 events to insert, last repl ID: replDumpId+6
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
+ // 1 event to truncate, last repl ID: replDumpId+8
+ run("TRUNCATE TABLE " + dbName + ".unptned");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty);
+ // 3 events to insert, last repl ID: replDumpId+11
+ run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ // Dump and load only first insert (1 record)
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3");
+ String incrementalDumpLocn = getResult(0, 0);
+ String incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
+
+ // Dump and load only second insert (2 records)
+ advanceDumpDir();
+ Integer lastReplID = Integer.valueOf(replDumpId);
+ lastReplID += 1000;
+ String toReplID = String.valueOf(lastReplID);
+
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3");
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2);
+
+ // Dump and load only truncate (0 records)
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 2");
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", empty);
+
+ // Dump and load insert after truncate (1 record)
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ incrementalDumpLocn = getResult(0, 0);
+ incrementalDumpId = getResult(0, 1, true);
+ LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
+ replDumpId = incrementalDumpId;
+ run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1);
}
@Test
@@ -953,11 +1523,8 @@ public class TestReplicationScenarios {
// Now, to actually testing status - first, we bootstrap.
- String testName = "incrementalStatus";
- LOG.info("Testing " + testName);
- String dbName = testName + "_" + tid;
-
- run("CREATE DATABASE " + dbName);
+ String name = testName.getMethodName();
+ String dbName = createDB(name);
advanceDumpDir();
run("REPL DUMP " + dbName);
String lastReplDumpLocn = getResult(0, 0);
@@ -1012,6 +1579,13 @@ public class TestReplicationScenarios {
}
+ private static String createDB(String name) {
+ LOG.info("Testing " + name);
+ String dbName = name + "_" + tid;
+ run("CREATE DATABASE " + dbName);
+ return dbName;
+ }
+
@Test
public void testEventFilters(){
// Test testing that the filters introduced by EventUtils are working correctly.
@@ -1031,8 +1605,8 @@ public class TestReplicationScenarios {
// events to those that match the dbname and tblname provided to the filter.
// If the tblname passed in to the filter is null, then it restricts itself
// to dbname-matching alone.
- IMetaStoreClient.NotificationFilter dbTblFilter = EventUtils.getDbTblNotificationFilter(dbname,tblname);
- IMetaStoreClient.NotificationFilter dbFilter = EventUtils.getDbTblNotificationFilter(dbname,null);
+ IMetaStoreClient.NotificationFilter dbTblFilter = new DatabaseAndTableFilter(dbname,tblname);
+ IMetaStoreClient.NotificationFilter dbFilter = new DatabaseAndTableFilter(dbname,null);
assertFalse(dbTblFilter.accept(null));
assertTrue(dbTblFilter.accept(createDummyEvent(dbname, tblname, 0)));
@@ -1049,7 +1623,7 @@ public class TestReplicationScenarios {
// within a range specified.
long evBegin = 50;
long evEnd = 75;
- IMetaStoreClient.NotificationFilter evRangeFilter = EventUtils.getEventBoundaryFilter(evBegin,evEnd);
+ IMetaStoreClient.NotificationFilter evRangeFilter = new EventBoundaryFilter(evBegin,evEnd);
assertTrue(evBegin < evEnd);
assertFalse(evRangeFilter.accept(null));
@@ -1065,9 +1639,9 @@ public class TestReplicationScenarios {
// that match a provided message format
IMetaStoreClient.NotificationFilter restrictByDefaultMessageFormat =
- EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat());
+ new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat());
IMetaStoreClient.NotificationFilter restrictByArbitraryMessageFormat =
- EventUtils.restrictByMessageFormat(MessageFactory.getInstance().getMessageFormat() + "_bogus");
+ new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat() + "_bogus");
NotificationEvent dummyEvent = createDummyEvent(dbname,tblname,0);
assertEquals(MessageFactory.getInstance().getMessageFormat(),dummyEvent.getMessageFormat());
@@ -1092,21 +1666,19 @@ public class TestReplicationScenarios {
}
};
- assertTrue(EventUtils.andFilter(yes, yes).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(yes, no).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(no, yes).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(no, no).accept(dummyEvent));
-
- assertTrue(EventUtils.andFilter(yes, yes, yes).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(yes, yes, no).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(yes, no, yes).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(yes, no, no).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(no, yes, yes).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(no, yes, no).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(no, no, yes).accept(dummyEvent));
- assertFalse(EventUtils.andFilter(no, no, no).accept(dummyEvent));
-
-
+ assertTrue(new AndFilter(yes, yes).accept(dummyEvent));
+ assertFalse(new AndFilter(yes, no).accept(dummyEvent));
+ assertFalse(new AndFilter(no, yes).accept(dummyEvent));
+ assertFalse(new AndFilter(no, no).accept(dummyEvent));
+
+ assertTrue(new AndFilter(yes, yes, yes).accept(dummyEvent));
+ assertFalse(new AndFilter(yes, yes, no).accept(dummyEvent));
+ assertFalse(new AndFilter(yes, no, yes).accept(dummyEvent));
+ assertFalse(new AndFilter(yes, no, no).accept(dummyEvent));
+ assertFalse(new AndFilter(no, yes, yes).accept(dummyEvent));
+ assertFalse(new AndFilter(no, yes, no).accept(dummyEvent));
+ assertFalse(new AndFilter(no, no, yes).accept(dummyEvent));
+ assertFalse(new AndFilter(no, no, no).accept(dummyEvent));
}
private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) {
@@ -1137,7 +1709,7 @@ public class TestReplicationScenarios {
if (tblName != null){
verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
}
- assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+ assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId));
return lastReplDumpId;
}
@@ -1152,7 +1724,7 @@ public class TestReplicationScenarios {
run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn + "'");
verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId);
verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
- assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+ assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId));
return lastReplDumpId;
}
@@ -1174,18 +1746,25 @@ public class TestReplicationScenarios {
return (lastResults.get(rowNum).split("\\t"))[colNum];
}
+ /**
+ * All the results that are read from the hive output will not preserve
+ * case sensitivity and will all be in lower case, hence we will check against
+ * only lower case data values.
+ * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
+ * before assert.
+ */
private void verifyResults(String[] data) throws IOException {
List<String> results = getOutput();
- LOG.info("Expecting {}",data);
- LOG.info("Got {}",results);
- assertEquals(data.length,results.size());
- for (int i = 0; i < data.length; i++){
- assertEquals(data[i],results.get(i));
+ LOG.info("Expecting {}", data);
+ LOG.info("Got {}", results);
+ assertEquals(data.length, results.size());
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
}
}
private List<String> getOutput() throws IOException {
- List<String> results = new ArrayList<String>();
+ List<String> results = new ArrayList<>();
try {
driver.getResults(results);
} catch (CommandNeedRetryException e) {
@@ -1217,6 +1796,18 @@ public class TestReplicationScenarios {
verifyResults(data);
}
+ private void verifyFail(String cmd) throws RuntimeException {
+ boolean success = false;
+ try {
+ success = run(cmd,false);
+ } catch (AssertionError ae){
+ LOG.warn("AssertionError:",ae);
+ throw new RuntimeException(ae);
+ }
+
+ assertFalse(success);
+ }
+
private static void run(String cmd) throws RuntimeException {
try {
run(cmd,false); // default arg-less run simply runs, and does not care about failure
@@ -1246,7 +1837,7 @@ public class TestReplicationScenarios {
return success;
}
- public static void createTestDataFile(String filename, String[] lines) throws IOException {
+ private static void createTestDataFile(String filename, String[] lines) throws IOException {
FileWriter writer = null;
try {
File file = new File(filename);
@@ -1261,5 +1852,4 @@ public class TestReplicationScenarios {
}
}
}
-
}