You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/02/03 05:40:44 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #2980: HIVE-25895: Bootstrap tables in table_diff during Incremental Load.

pkumarsinha commented on a change in pull request #2980:
URL: https://github.com/apache/hive/pull/2980#discussion_r798225837



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -70,6 +71,8 @@
   /** event ack file which contains the event id till which the cluster was last loaded. */
   public static final String EVENT_ACK_FILE = "event_ack";
 
+  public static final String BOOTSTRAP_TABLES_LIST = "bootstrap_table_list";

Review comment:
       What would happen in rollback case , like we initiated the failover but aborted the process in between. Theoretically, upto what point will we allow that to happen? 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -734,4 +743,167 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event
       InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // reset the behaviour
     }
   }
+
+
+  @Test
+  public void testReverseBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle.
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Create 4 managed tables and do a dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("insert into table t3 values (20),(31),(42)")
+        .run("create table t4 (place string) partitioned by (country string)")
+        .run("insert into table t4 partition(country='india') values ('bangalore')")
+        .run("insert into table t4 partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .verifyReplTargetProperty(replicatedDbName);
+
+
+    // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+    primary.run("use " + primaryDbName)
+        .run("create table tnew_managed (id int)")
+        .run("insert into table t1 values (25)")
+        .run("insert into table tnew_managed values (110)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("drop table t3");
+
+    // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (101)")
+        .run("insert into table t1 values (210),(321)")
+        .run("insert into table t2 partition(country='india') values ('delhi')")
+        .run("insert into table t3 values (11)")
+        .run("insert into table t4 partition(country='india') values ('lucknow')")
+        .run("create table t5 (place string) partitioned by (country string)")
+        .run("insert into table t5 partition(country='china') values ('beejing')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    Path dumpPath = new Path(tuple.dumpLocation);
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3")));
+
+    // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
+    // rest.
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    // _bootstrap directory should be created as bootstrap enabled on external tables.
+    Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
+    FileStatus[] listStatus =
+        dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
+    ArrayList<String> tablesBootstrapped = new ArrayList<String>();
+    for (FileStatus file : listStatus) {
+      tablesBootstrapped.add(file.getPath().getName());
+    }
+
+    assertTrue(tablesBootstrapped.containsAll(Arrays.asList("t1", "t2", "t3")));
+
+    // Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    primary.run("use " + primaryDbName)
+        .run("select id from t1")
+        .verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321" })
+        .run("select place from t2 where country = 'india'")
+        .verifyResults(new String[] { "delhi", "chennai" })
+        .run("select place from t2 where country = 'france'")
+        .verifyFailure(new String[] { "lyon" })
+        .run("select id from t3")
+        .verifyResults(new String[] { "10", "20", "31", "42", "11" })
+        .run("select place from t4 where country = 'india'")
+        .verifyResults(new String[] { "bangalore", "lucknow" })
+        .run("select place from t5 where country = 'china'")
+        .verifyResults(new String[] { "beejing" });
+
+    // Check for correct db Properties set.
+
+    Map<String, String> targetParams = primary.getDatabase(primaryDbName).getParameters();
+    Map<String, String> sourceParams = replica.getDatabase(replicatedDbName).getParameters();
+
+    // Check the properties on the new target database.
+    assertTrue(targetParams.containsKey(TARGET_OF_REPLICATION));
+    assertFalse(targetParams.containsKey(SOURCE_OF_REPLICATION));
+
+    // Check the properties on the new source database.
+    assertFalse(sourceParams.containsKey(TARGET_OF_REPLICATION));
+    assertFalse(sourceParams.containsKey(CURR_STATE_ID_TARGET.toString()));
+    assertFalse(sourceParams.containsKey(CURR_STATE_ID_SOURCE.toString()));
+    assertFalse(sourceParams.containsKey(REPL_TARGET_DB_PROPERTY));
+    assertTrue(sourceParams.containsKey(SOURCE_OF_REPLICATION));
+
+    // Proceed with normal incremental flow, post optimised bootstrap is over.
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (98)")
+        .run("insert into table t2 partition(country='england') values ('london')")
+        .run("insert into table t2 partition(country='india') values ('jaipur')")
+        .run("insert into table t3 values (15),(16)")
+        .run("drop table t4")
+        .run("insert into table t5 partition(country='china') values ('chengdu')")
+        .dump(replicatedDbName, withClause);
+
+    // Do load and check if the data gets loaded.
+    primary.load(primaryDbName, replicatedDbName, withClause)
+        .run("select id from t1")
+        .verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321", "98" })
+        .run("select place from t2 where country = 'england'")
+        .verifyResults(new String[] { "london" })
+        .run("select place from t2 where country = 'india'")
+        .verifyResults(new String[] { "delhi", "chennai", "jaipur" })
+        .run("select id from t3")
+        .verifyResults(new String[] { "10", "20", "31", "42", "11", "15", "16" })
+        .run("show tables like 't4'")
+        .verifyFailure(new String[]{"t4"})
+        .run("select place from t5 where country = 'china'")
+        .verifyResults(new String[] { "beejing", "chengdu" });
+  }

Review comment:
       Add/extend test for failback scenario as well. Is there anything missing currently which will prevent us from having a successful failback?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -734,4 +743,167 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event
       InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // reset the behaviour
     }
   }
+
+
+  @Test
+  public void testReverseBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle.
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Create 4 managed tables and do a dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("insert into table t3 values (20),(31),(42)")
+        .run("create table t4 (place string) partitioned by (country string)")
+        .run("insert into table t4 partition(country='india') values ('bangalore')")
+        .run("insert into table t4 partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .verifyReplTargetProperty(replicatedDbName);
+
+
+    // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+    primary.run("use " + primaryDbName)
+        .run("create table tnew_managed (id int)")
+        .run("insert into table t1 values (25)")
+        .run("insert into table tnew_managed values (110)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("drop table t3");
+
+    // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (101)")
+        .run("insert into table t1 values (210),(321)")
+        .run("insert into table t2 partition(country='india') values ('delhi')")
+        .run("insert into table t3 values (11)")
+        .run("insert into table t4 partition(country='india') values ('lucknow')")
+        .run("create table t5 (place string) partitioned by (country string)")
+        .run("insert into table t5 partition(country='china') values ('beejing')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    Path dumpPath = new Path(tuple.dumpLocation);
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3")));
+
+    // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
+    // rest.
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    // _bootstrap directory should be created as bootstrap enabled on external tables.
+    Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
+    FileStatus[] listStatus =
+        dumpPath1.getFileSystem(conf).listStatus(dumpPath1);

Review comment:
       nit: move this to previous line itself.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -70,6 +71,8 @@
   /** event ack file which contains the event id till which the cluster was last loaded. */
   public static final String EVENT_ACK_FILE = "event_ack";
 
+  public static final String BOOTSTRAP_TABLES_LIST = "bootstrap_table_list";

Review comment:
       Can we rename this file as _failover_bootstrap_table_list

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -734,4 +743,167 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event
       InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // reset the behaviour
     }
   }
+
+
+  @Test
+  public void testReverseBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle.
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Create 4 managed tables and do a dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("insert into table t3 values (20),(31),(42)")
+        .run("create table t4 (place string) partitioned by (country string)")
+        .run("insert into table t4 partition(country='india') values ('bangalore')")
+        .run("insert into table t4 partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .verifyReplTargetProperty(replicatedDbName);
+
+
+    // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+    primary.run("use " + primaryDbName)
+        .run("create table tnew_managed (id int)")
+        .run("insert into table t1 values (25)")
+        .run("insert into table tnew_managed values (110)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("drop table t3");
+
+    // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (101)")
+        .run("insert into table t1 values (210),(321)")
+        .run("insert into table t2 partition(country='india') values ('delhi')")
+        .run("insert into table t3 values (11)")
+        .run("insert into table t4 partition(country='india') values ('lucknow')")
+        .run("create table t5 (place string) partitioned by (country string)")
+        .run("insert into table t5 partition(country='china') values ('beejing')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    Path dumpPath = new Path(tuple.dumpLocation);
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("tnew_managed", "t1", "t2", "t3")));
+
+    // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
+    // rest.
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    // _bootstrap directory should be created as bootstrap enabled on external tables.
+    Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
+    FileStatus[] listStatus =
+        dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
+    ArrayList<String> tablesBootstrapped = new ArrayList<String>();
+    for (FileStatus file : listStatus) {
+      tablesBootstrapped.add(file.getPath().getName());
+    }
+
+    assertTrue(tablesBootstrapped.containsAll(Arrays.asList("t1", "t2", "t3")));
+
+    // Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    primary.run("use " + primaryDbName)
+        .run("select id from t1")
+        .verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321" })
+        .run("select place from t2 where country = 'india'")
+        .verifyResults(new String[] { "delhi", "chennai" })
+        .run("select place from t2 where country = 'france'")
+        .verifyFailure(new String[] { "lyon" })
+        .run("select id from t3")
+        .verifyResults(new String[] { "10", "20", "31", "42", "11" })
+        .run("select place from t4 where country = 'india'")
+        .verifyResults(new String[] { "bangalore", "lucknow" })
+        .run("select place from t5 where country = 'china'")
+        .verifyResults(new String[] { "beejing" });
+
+    // Check for correct db Properties set.
+
+    Map<String, String> targetParams = primary.getDatabase(primaryDbName).getParameters();
+    Map<String, String> sourceParams = replica.getDatabase(replicatedDbName).getParameters();
+
+    // Check the properties on the new target database.
+    assertTrue(targetParams.containsKey(TARGET_OF_REPLICATION));
+    assertFalse(targetParams.containsKey(SOURCE_OF_REPLICATION));
+
+    // Check the properties on the new source database.
+    assertFalse(sourceParams.containsKey(TARGET_OF_REPLICATION));
+    assertFalse(sourceParams.containsKey(CURR_STATE_ID_TARGET.toString()));
+    assertFalse(sourceParams.containsKey(CURR_STATE_ID_SOURCE.toString()));
+    assertFalse(sourceParams.containsKey(REPL_TARGET_DB_PROPERTY));
+    assertTrue(sourceParams.containsKey(SOURCE_OF_REPLICATION));
+
+    // Proceed with normal incremental flow, post optimised bootstrap is over.
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (98)")
+        .run("insert into table t2 partition(country='england') values ('london')")
+        .run("insert into table t2 partition(country='india') values ('jaipur')")
+        .run("insert into table t3 values (15),(16)")
+        .run("drop table t4")
+        .run("insert into table t5 partition(country='china') values ('chengdu')")
+        .dump(replicatedDbName, withClause);
+
+    // Do load and check if the data gets loaded.
+    primary.load(primaryDbName, replicatedDbName, withClause)
+        .run("select id from t1")
+        .verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321", "98" })
+        .run("select place from t2 where country = 'england'")
+        .verifyResults(new String[] { "london" })
+        .run("select place from t2 where country = 'india'")
+        .verifyResults(new String[] { "delhi", "chennai", "jaipur" })
+        .run("select id from t3")
+        .verifyResults(new String[] { "10", "20", "31", "42", "11", "15", "16" })
+        .run("show tables like 't4'")
+        .verifyFailure(new String[]{"t4"})
+        .run("select place from t5 where country = 'china'")
+        .verifyResults(new String[] { "beejing", "chengdu" });
+  }

Review comment:
       Add few negative test cases too. Like bootstrap failed in the middle of iteration and can recover from it.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -734,4 +743,167 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event
       InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // reset the behaviour
     }
   }
+
+
+  @Test
+  public void testReverseBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle.
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Create 4 managed tables and do a dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("insert into table t3 values (20),(31),(42)")
+        .run("create table t4 (place string) partitioned by (country string)")
+        .run("insert into table t4 partition(country='india') values ('bangalore')")
+        .run("insert into table t4 partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .verifyReplTargetProperty(replicatedDbName);
+
+
+    // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+    primary.run("use " + primaryDbName)
+        .run("create table tnew_managed (id int)")
+        .run("insert into table t1 values (25)")
+        .run("insert into table tnew_managed values (110)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("drop table t3");
+
+    // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)

Review comment:
       Does deny all policy not pitch in by default at this point of time?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -734,4 +743,167 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event
       InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // reset the behaviour
     }
   }
+
+
+  @Test
+  public void testReverseBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle.
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Create 4 managed tables and do a dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("insert into table t3 values (20),(31),(42)")
+        .run("create table t4 (place string) partitioned by (country string)")
+        .run("insert into table t4 partition(country='india') values ('bangalore')")
+        .run("insert into table t4 partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .verifyReplTargetProperty(replicatedDbName);
+
+
+    // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+    primary.run("use " + primaryDbName)
+        .run("create table tnew_managed (id int)")
+        .run("insert into table t1 values (25)")
+        .run("insert into table tnew_managed values (110)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("drop table t3");
+
+    // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (101)")
+        .run("insert into table t1 values (210),(321)")
+        .run("insert into table t2 partition(country='india') values ('delhi')")
+        .run("insert into table t3 values (11)")
+        .run("insert into table t4 partition(country='india') values ('lucknow')")
+        .run("create table t5 (place string) partitioned by (country string)")
+        .run("insert into table t5 partition(country='china') values ('beejing')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);

Review comment:
       Would mkdirs fail if the dir is present earlier?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -734,4 +743,167 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event
       InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // reset the behaviour
     }
   }
+
+
+  @Test
+  public void testReverseBootstrap() throws Throwable {
+    List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'");
+
+    // Do a bootstrap cycle.
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Create 4 managed tables and do a dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new york')")
+        .run("create table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("insert into table t3 values (20),(31),(42)")
+        .run("create table t4 (place string) partitioned by (country string)")
+        .run("insert into table t4 partition(country='india') values ('bangalore')")
+        .run("insert into table t4 partition(country='us') values ('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the load and check all the external & managed tables are present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .verifyReplTargetProperty(replicatedDbName);
+
+
+    // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
+    primary.run("use " + primaryDbName)
+        .run("create table tnew_managed (id int)")
+        .run("insert into table t1 values (25)")
+        .run("insert into table tnew_managed values (110)")
+        .run("insert into table t2 partition(country='france') values ('lyon')")
+        .run("drop table t3");
+
+    // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
+    replica.run("use " + replicatedDbName)
+        .run("insert into table t1 values (101)")
+        .run("insert into table t1 values (210),(321)")
+        .run("insert into table t2 partition(country='india') values ('delhi')")
+        .run("insert into table t3 values (11)")
+        .run("insert into table t4 partition(country='india') values ('lucknow')")
+        .run("create table t5 (place string) partitioned by (country string)")
+        .run("insert into table t5 partition(country='china') values ('beejing')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'");

Review comment:
       Re-using the same repl dir isn't supported?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org