You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/04/16 13:19:22 UTC
[hive] branch master updated: HIVE-21529: Hive support bootstrap of
ACID/MM tables on an existing policy (Ashutosh Bapat,
reviewed by Sankar Hariappan)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7c35779 HIVE-21529: Hive support bootstrap of ACID/MM tables on an existing policy (Ashutosh Bapat, reviewed by Sankar Hariappan)
7c35779 is described below
commit 7c35779cceeea72d00611a4b25808eeb2f3fdc31
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Tue Apr 16 18:48:58 2019 +0530
HIVE-21529: Hive support bootstrap of ACID/MM tables on an existing policy (Ashutosh Bapat, reviewed by Sankar Hariappan)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 12 +-
.../parse/TestReplicationScenariosAcidTables.java | 639 ++++++++++++++++++---
.../hadoop/hive/ql/parse/WarehouseInstance.java | 2 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 113 +++-
.../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 17 +-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 13 +
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 12 +
.../repl/dump/events/AllocWriteIdHandler.java | 14 +
.../parse/repl/dump/events/CommitTxnHandler.java | 25 +-
.../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 4 +-
.../hadoop/hive/metastore/MetaStoreTestUtils.java | 23 +-
11 files changed, 749 insertions(+), 125 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f74485b..9d9fdbf 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -470,12 +470,12 @@ public class HiveConf extends Configuration {
REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false,
"Indicates whether replication dump only metadata information or data + metadata. \n"
+ "This config makes hive.repl.include.external.tables config ineffective."),
- REPL_DUMP_INCLUDE_ACID_TABLES("hive.repl.dump.include.acid.tables", false,
- "Indicates if repl dump should include information about ACID tables. It should be \n"
- + "used in conjunction with 'hive.repl.dump.metadata.only' to enable copying of \n"
- + "metadata for acid tables which do not require the corresponding transaction \n"
- + "semantics to be applied on target. This can be removed when ACID table \n"
- + "replication is supported."),
+ REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false,
+ "Indicates if repl dump should bootstrap the information about ACID tables along with \n"
+ + "incremental dump for replication. It is recommended to keep this config parameter \n"
+ + "as false always and should be set to true only via WITH clause of REPL DUMP \n"
+ + "command. It should be set to true only once for incremental repl dump on \n"
+ + "each of the existing replication policies after enabling acid tables replication."),
REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT("hive.repl.bootstrap.dump.open.txn.timeout", "1h",
new TimeValidator(TimeUnit.HOURS),
"Indicates the timeout for all transactions which are opened before triggering bootstrap REPL DUMP. "
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 342985e..aba97ec 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.Behaviour
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
@@ -55,6 +57,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Collections;
import java.util.Map;
@@ -80,6 +83,14 @@ public class TestReplicationScenariosAcidTables {
REPL_TEST_ACID_INSERT_OVERWRITE, REPL_TEST_ACID_INSERT_IMPORT, REPL_TEST_ACID_INSERT_LOADLOCAL,
REPL_TEST_ACID_INSERT_UNION
}
+ private static List<String> dumpWithoutAcidClause = Collections.singletonList(
+ "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
+ private static List<String> dumpWithAcidBootstrapClause = Arrays.asList(
+ "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
+ "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+ private List<String> acidTableNames = new LinkedList<>();
+ private List<String> nonAcidTableNames = new LinkedList<>();
+
@BeforeClass
public static void classLevelSetup() throws Exception {
@@ -109,6 +120,7 @@ public class TestReplicationScenariosAcidTables {
put("hive.mapred.mode", "nonstrict");
put("mapred.input.dir.recursive", "true");
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.in.repl.test", "true");
}};
acidEnableConf.putAll(overrides);
@@ -149,10 +161,10 @@ public class TestReplicationScenariosAcidTables {
primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}
- private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable {
- return primary.run("use " + primaryDbName)
+ private void prepareAcidData(String primaryDbName) throws Throwable {
+ primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
- "tblproperties (\"transactional\"=\"true\")")
+ "tblproperties (\"transactional\"=\"true\")")
.run("insert into t1 values(1)")
.run("insert into t1 values(2)")
.run("create table t2 (place string) partitioned by (country string) clustered by(place) " +
@@ -165,46 +177,344 @@ public class TestReplicationScenariosAcidTables {
"\"transactional_properties\"=\"insert_only\")")
.run("insert into t3 values(11)")
.run("insert into t3 values(22)")
- .run("create table t4 (id int)")
- .run("insert into t4 values(111), (222)")
.run("create table t5 (id int) stored as orc ")
.run("insert into t5 values(1111), (2222)")
.run("alter table t5 set tblproperties (\"transactional\"=\"true\")")
- .run("insert into t5 values(3333)")
- .dump(primaryDbName, fromReplId);
+ .run("insert into t5 values(3333)");
+ acidTableNames.add("t1");
+ acidTableNames.add("t2");
+ acidTableNames.add("t3");
+ acidTableNames.add("t5");
}
- private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable {
+ private void prepareNonAcidData(String primaryDbName) throws Throwable {
+ primary.run("use " + primaryDbName)
+ .run("create table t4 (id int)")
+ .run("insert into t4 values(111), (222)");
+ nonAcidTableNames.add("t4");
+ }
+ private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId,
+ List<String> withClause) throws Throwable {
+ prepareAcidData(primaryDbName);
+ prepareNonAcidData(primaryDbName);
+ return primary.run("use " + primaryDbName)
+ .dump(primaryDbName, fromReplId, withClause != null ?
+ withClause : Collections.emptyList());
+ }
+
+ private void verifyNonAcidTableLoad(String replicatedDbName) throws Throwable {
+ replica.run("use " + replicatedDbName)
+ .run("select id from t4 order by id")
+ .verifyResults(new String[] {"111", "222"});
+ }
+
+ private void verifyAcidTableLoad(String replicatedDbName) throws Throwable {
replica.run("use " + replicatedDbName)
- .run("show tables")
- .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"})
- .run("repl status " + replicatedDbName)
- .verifyResult(lastReplId)
.run("select id from t1 order by id")
.verifyResults(new String[]{"1", "2"})
.run("select country from t2 order by country")
.verifyResults(new String[] {"france", "india", "us"})
.run("select rank from t3 order by rank")
.verifyResults(new String[] {"11", "22"})
- .run("select id from t4 order by id")
- .verifyResults(new String[] {"111", "222"})
.run("select id from t5 order by id")
.verifyResults(new String[] {"1111", "2222", "3333"});
}
+ private void verifyLoadExecution(String replicatedDbName, String lastReplId, boolean includeAcid)
+ throws Throwable {
+ List<String> tableNames = new LinkedList<>(nonAcidTableNames);
+ if (includeAcid) {
+ tableNames.addAll(acidTableNames);
+ }
+ replica.run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(tableNames)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(lastReplId);
+ verifyNonAcidTableLoad(replicatedDbName);
+ if (includeAcid) {
+ verifyAcidTableLoad(replicatedDbName);
+ }
+ }
+
@Test
public void testAcidTablesBootstrap() throws Throwable {
- WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
+ // Bootstrap
+ WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null, null);
replica.load(replicatedDbName, bootstrapDump.dumpLocation);
- verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId);
+ verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
+
+ // First incremental, after bootstrap
+ prepareIncNonAcidData(primaryDbName);
+ prepareIncAcidData(primaryDbName);
+ LOG.info(testName.getMethodName() + ": first incremental dump and load.");
+ WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId);
+ replica.load(replicatedDbName, incDump.dumpLocation);
+ verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
+
+ // Second incremental, after bootstrap
+ prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+ prepareInc2AcidData(primaryDbName, primary.hiveConf);
+ LOG.info(testName.getMethodName() + ": second incremental dump and load.");
+ WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, incDump.lastReplicationId);
+ replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
}
@Test
public void testAcidTablesMoveOptimizationBootStrap() throws Throwable {
- WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
+ WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null, null);
replica.load(replicatedDbName, bootstrapDump.dumpLocation,
Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
- verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId);
+ verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
+ }
+
+ private void prepareIncAcidData(String dbName) throws Throwable {
+ primary.run("use " + dbName)
+ .run("create table t6 (str string) stored as orc tblproperties " +
+ "(\"transactional\"=\"true\")")
+ .run("insert into t6 values ('aaa'), ('bbb')")
+ .run("alter table t2 add columns (placetype string)")
+ .run("update t2 set placetype = 'city'");
+ acidTableNames.add("t6");
+ }
+
+ private void verifyIncAcidLoad(String dbName) throws Throwable {
+ replica.run("use " + dbName)
+ .run("select str from t6 order by str")
+ .verifyResults(new String[]{"aaa", "bbb"})
+ .run("select country from t2 order by country")
+ .verifyResults(new String[] {"france", "india", "us"})
+ .run("select distinct placetype from t2")
+ .verifyResult("city")
+ .run("select id from t1 order by id")
+ .verifyResults(new String[]{"1", "2"})
+ .run("select rank from t3 order by rank")
+ .verifyResults(new String[] {"11", "22"})
+ .run("select id from t5 order by id")
+ .verifyResults(new String[] {"1111", "2222", "3333"});
+ }
+
+ private void runUsingDriver(IDriver driver, String command) throws Throwable {
+ CommandProcessorResponse ret = driver.run(command);
+ if (ret.getException() != null) {
+ throw ret.getException();
+ }
+ }
+
+ private void prepareInc2AcidData(String dbName, HiveConf hiveConf) throws Throwable {
+ IDriver driver = DriverFactory.newDriver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ runUsingDriver(driver, "use " + dbName);
+ runUsingDriver(driver, "insert into t1 values (3)");
+ runUsingDriver(driver, "insert into t5 values (4444)");
+ }
+
+ private void verifyInc2AcidLoad(String dbName) throws Throwable {
+ replica.run("use " + dbName)
+ .run("select str from t6 order by str")
+ .verifyResults(new String[]{"aaa", "bbb"})
+ .run("select country from t2 order by country")
+ .verifyResults(new String[] {"france", "india", "us"})
+ .run("select distinct placetype from t2")
+ .verifyResult("city")
+ .run("select id from t1 order by id")
+ .verifyResults(new String[]{"1", "2", "3"})
+ .run("select rank from t3 order by rank")
+ .verifyResults(new String[] {"11", "22"})
+ .run("select id from t5 order by id")
+ .verifyResults(new String[] {"1111", "2222", "3333", "4444"});
+ }
+
+ private void prepareIncNonAcidData(String dbName) throws Throwable {
+ primary.run("use " + dbName)
+ .run("insert into t4 values (333)")
+ .run("create table t7 (str string)")
+ .run("insert into t7 values ('aaa')");
+ nonAcidTableNames.add("t7");
+ }
+
+ private void verifyIncNonAcidLoad(String dbName) throws Throwable {
+ replica.run("use " + dbName)
+ .run("select * from t4 order by id")
+ .verifyResults(new String[] {"111", "222", "333"})
+ .run("select * from t7")
+ .verifyResult("aaa");
+ }
+
+ private void prepareInc2NonAcidData(String dbName, HiveConf hiveConf) throws Throwable {
+ IDriver driver = DriverFactory.newDriver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ runUsingDriver(driver, "use " + dbName);
+ runUsingDriver(driver, "insert into t4 values (444)");
+ runUsingDriver(driver, "insert into t7 values ('bbb')");
+ }
+
+ private void verifyInc2NonAcidLoad(String dbName) throws Throwable {
+ replica.run("use " + dbName)
+ .run("select * from t4 order by id")
+ .verifyResults(new String[] {"111", "222", "333", "444"})
+ .run("select * from t7")
+ .verifyResults(new String[] {"aaa", "bbb"});
+ }
+
+ private void verifyIncLoad(String dbName, String lastReplId)
+ throws Throwable {
+ List<String> tableNames = new LinkedList<>(nonAcidTableNames);
+ tableNames.addAll(acidTableNames);
+ replica.run("use " + dbName)
+ .run("show tables")
+ .verifyResults(tableNames)
+ .run("repl status " + dbName)
+ .verifyResult(lastReplId);
+ verifyIncNonAcidLoad(dbName);
+ verifyIncAcidLoad(dbName);
+ }
+
+ private void verifyInc2Load(String dbName, String lastReplId)
+ throws Throwable {
+ List<String> tableNames = new LinkedList<>(nonAcidTableNames);
+ tableNames.addAll(acidTableNames);
+ replica.run("use " + dbName)
+ .run("show tables")
+ .verifyResults(tableNames)
+ .run("repl status " + dbName)
+ .verifyResult(lastReplId);
+ verifyInc2NonAcidLoad(dbName);
+ verifyInc2AcidLoad(dbName);
+ }
+
+ @Test
+ public void testAcidTablesBootstrapDuringIncremental() throws Throwable {
+ // Take a bootstrap dump without acid tables
+ WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+ dumpWithoutAcidClause);
+ LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
+
+ // Take a incremental dump with acid table bootstrap
+ prepareIncAcidData(primaryDbName);
+ prepareIncNonAcidData(primaryDbName);
+ LOG.info(testName.getMethodName() + ": incremental dump and load dump with acid table bootstrap.");
+ WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+ replica.load(replicatedDbName, incrementalDump.dumpLocation);
+ verifyIncLoad(replicatedDbName, incrementalDump.lastReplicationId);
+ // Ckpt should be set on bootstrapped tables.
+ replica.verifyIfCkptSetForTables(replicatedDbName, acidTableNames, incrementalDump.dumpLocation);
+
+ // Take a second normal incremental dump after Acid table boostrap
+ prepareInc2AcidData(primaryDbName, primary.hiveConf);
+ prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+ LOG.info(testName.getMethodName()
+ + ": second incremental dump and load dump after incremental with acid table " +
+ "bootstrap.");
+ WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, incrementalDump.lastReplicationId);
+ replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+ }
+
+ @Test
+ public void testRetryAcidTablesBootstrapFromDifferentDump() throws Throwable {
+ WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+ dumpWithoutAcidClause);
+ LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
+
+ prepareIncAcidData(primaryDbName);
+ prepareIncNonAcidData(primaryDbName);
+ LOG.info(testName.getMethodName() + ": first incremental dump with acid table bootstrap.");
+ WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+
+ // Fail setting ckpt property for table t5 but success for earlier tables
+ BehaviourInjection<CallerArguments, Boolean> callerVerifier
+ = new BehaviourInjection<CallerArguments, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable CallerArguments args) {
+ if (args.tblName.equalsIgnoreCase("t5") && args.dbName.equalsIgnoreCase(replicatedDbName)) {
+ injectionPathCalled = true;
+ LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName);
+ return false;
+ }
+ return true;
+ }
+ };
+
+ // Fail repl load before the ckpt property is set for t4 and after it is set for t2.
+ // In the retry, these half baked tables should be dropped and bootstrap should be successful.
+ InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
+ try {
+ LOG.info(testName.getMethodName()
+ + ": loading first incremental dump with acid table bootstrap (will fail)");
+ replica.loadFailure(replicatedDbName, incDump.dumpLocation);
+ callerVerifier.assertInjectionsPerformed(true, false);
+ } finally {
+ InjectableBehaviourObjectStore.resetAlterTableModifier();
+ }
+
+ prepareInc2AcidData(primaryDbName, primary.hiveConf);
+ prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+ LOG.info(testName.getMethodName() + ": second incremental dump with acid table bootstrap");
+ WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+
+ // Set incorrect bootstrap dump to clean tables. Here, used the full bootstrap dump which is invalid.
+ // So, REPL LOAD fails.
+ List<String> loadWithClause = Collections.singletonList(
+ "'" + ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+ + bootstrapDump.dumpLocation + "'");
+ LOG.info(testName.getMethodName()
+ + ": trying to load second incremental dump with wrong bootstrap dump "
+ + " specified for cleaning ACID tables. Should fail.");
+ replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+
+ // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one.
+ loadWithClause = Collections.singletonList(
+ "'" + ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+ + incDump.dumpLocation + "'");
+
+ LOG.info(testName.getMethodName()
+ + ": trying to load second incremental dump with correct bootstrap dump "
+ + "specified for cleaning ACID tables. Should succeed.");
+ replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+ verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+
+ // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD
+ // will also drop those tables which will cause data loss.
+ loadWithClause = Collections.emptyList();
+
+ // Verify if bootstrapping with same dump is idempotent and return same result
+ LOG.info(testName.getMethodName()
+ + ": trying to load second incremental dump (with acid bootstrap) again."
+ + " Should succeed.");
+ replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+ verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+ }
+
+ @Test
+ public void retryIncBootstrapAcidFromDifferentDumpWithoutCleanTablesConfig() throws Throwable {
+ WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+ dumpWithoutAcidClause);
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+
+ prepareIncAcidData(primaryDbName);
+ prepareIncNonAcidData(primaryDbName);
+ WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+ WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+ replica.load(replicatedDbName, incDump.dumpLocation);
+
+ // Re-bootstrapping from different bootstrap dump without clean tables config should fail.
+ replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, Collections.emptyList(),
+ ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
}
@Test
@@ -212,23 +522,90 @@ public class TestReplicationScenariosAcidTables {
WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null);
replica.load(replicatedDbName, bootstrapDump.dumpLocation,
Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
- WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, bootstrapDump.lastReplicationId);
+ WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName,
+ bootstrapDump.lastReplicationId, null);
replica.load(replicatedDbName, incrDump.dumpLocation,
Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
- verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId);
+ verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true);
+ }
+
+ private List<Long> openTxns(int numTxns, TxnStore txnHandler, HiveConf primaryConf) throws Throwable {
+ OpenTxnsResponse otResp = txnHandler.openTxns(new OpenTxnRequest(numTxns, "u1", "localhost"));
+ List<Long> txns = otResp.getTxn_ids();
+ String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(numTxns - 1);
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+ numTxns, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+ return txns;
+ }
+
+ private void allocateWriteIdsForTables(String primaryDbName, Map<String, Long> tables,
+ TxnStore txnHandler,
+ List<Long> txns, HiveConf primaryConf) throws Throwable {
+ AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
+ rqst.setDbName(primaryDbName);
+
+ for(Map.Entry<String, Long> entry : tables.entrySet()) {
+ rqst.setTableName(entry.getKey());
+ rqst.setTxnIds(txns);
+ txnHandler.allocateTableWriteIds(rqst);
+ }
+ verifyWriteIdsForTables(tables, primaryConf, primaryDbName);
+ }
+
+ private void verifyWriteIdsForTables(Map<String, Long> tables, HiveConf conf, String dbName)
+ throws Throwable {
+ for(Map.Entry<String, Long> entry : tables.entrySet()) {
+ Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_TO_WRITE_ID"),
+ entry.getValue().longValue(),
+ TxnDbUtil.countQueryAgent(conf,
+ "select count(*) from TXN_TO_WRITE_ID where t2w_database = '"
+ + dbName.toLowerCase()
+ + "' and t2w_table = '" + entry.getKey() + "'"));
+ }
+ }
+
+ private void verifyAllOpenTxnsAborted(List<Long> txns, HiveConf primaryConf) throws Throwable {
+ int numTxns = txns.size();
+ String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(numTxns - 1);
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+ 0, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+ Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+ numTxns, TxnDbUtil.countQueryAgent(primaryConf,
+ "select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
+ }
+
+ private void verifyNextId(Map<String, Long> tables, String dbName, HiveConf conf) throws Throwable {
+ // Verify the next write id
+ for(Map.Entry<String, Long> entry : tables.entrySet()) {
+ String[] nextWriteId =
+ TxnDbUtil.queryToString(conf,
+ "select nwi_next from NEXT_WRITE_ID where nwi_database = '"
+ + dbName.toLowerCase() + "' and nwi_table = '"
+ + entry.getKey() + "'").split("\n");
+ Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), entry.getValue() + 1);
+ }
+ }
+
+ private void verifyCompactionQueue(Map<String, Long> tables, String dbName, HiveConf conf)
+ throws Throwable {
+ for(Map.Entry<String, Long> entry : tables.entrySet()) {
+ Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+ entry.getValue().longValue(),
+ TxnDbUtil.countQueryAgent(conf,
+ "select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName
+ + "' and cq_table = '" + entry.getKey() + "'"));
+ }
}
@Test
public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable {
- // Open 5 txns
+ int numTxns = 5;
HiveConf primaryConf = primary.getConf();
TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
- OpenTxnsResponse otResp = txnHandler.openTxns(new OpenTxnRequest(5, "u1", "localhost"));
- List<Long> txns = otResp.getTxn_ids();
- String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(4);
- Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
- 5, TxnDbUtil.countQueryAgent(primaryConf,
- "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
// Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables.
primary.run("use " + primaryDbName)
@@ -239,21 +616,13 @@ public class TestReplicationScenariosAcidTables {
"\"transactional_properties\"=\"insert_only\")")
.run("insert into t2 partition(name='Bob') values(11)")
.run("insert into t2 partition(name='Carl') values(10)");
+
// Allocate write ids for both tables t1 and t2 for all txns
// t1=5+1(insert) and t2=5+2(insert)
- AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(primaryDbName, "t1");
- rqst.setTxnIds(txns);
- txnHandler.allocateTableWriteIds(rqst);
- rqst.setTableName("t2");
- txnHandler.allocateTableWriteIds(rqst);
- Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
- 6, TxnDbUtil.countQueryAgent(primaryConf,
- "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
- + "' and t2w_table = 't1'"));
- Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
- 7, TxnDbUtil.countQueryAgent(primaryConf,
- "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
- + "' and t2w_table = 't2'"));
+ Map<String, Long> tables = new HashMap<>();
+ tables.put("t1", numTxns+1L);
+ tables.put("t2", numTxns+2L);
+ allocateWriteIdsForTables(primaryDbName, tables, txnHandler, txns, primaryConf);
// Bootstrap dump with open txn timeout as 1s.
List<String> withConfigs = Arrays.asList(
@@ -263,22 +632,8 @@ public class TestReplicationScenariosAcidTables {
.dump(primaryDbName, null, withConfigs);
// After bootstrap dump, all the opened txns should be aborted. Verify it.
- Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
- 0, TxnDbUtil.countQueryAgent(primaryConf,
- "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
- Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
- 5, TxnDbUtil.countQueryAgent(primaryConf,
- "select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
-
- // Verify the next write id
- String[] nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
- + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't1'")
- .split("\n");
- Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
- nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
- + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't2'")
- .split("\n");
- Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+ verifyAllOpenTxnsAborted(txns, primaryConf);
+ verifyNextId(tables, primaryDbName, primaryConf);
// Bootstrap load which should also replicate the aborted write ids on both tables.
HiveConf replicaConf = replica.getConf();
@@ -294,35 +649,82 @@ public class TestReplicationScenariosAcidTables {
.verifyResults(new String[] {"10", "11"});
// Verify if HWM is properly set after REPL LOAD
- nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
- + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't1'")
- .split("\n");
- Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
- nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
- + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't2'")
- .split("\n");
- Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+ verifyNextId(tables, replicatedDbName, replicaConf);
+
+ // Verify if all the aborted write ids are replicated to the replicated DB
+ for(Map.Entry<String, Long> entry : tables.entrySet()) {
+ entry.setValue((long) numTxns);
+ }
+ verifyWriteIdsForTables(tables, replicaConf, replicatedDbName);
+
+ // Verify if entries added in COMPACTION_QUEUE for each table/partition
+ // t1-> 1 entry and t2-> 2 entries (1 per partition)
+ tables.clear();
+ tables.put("t1", 1L);
+ tables.put("t2", 2L);
+ verifyCompactionQueue(tables, replicatedDbName, replicaConf);
+ }
+
+ @Test
+ public void testAcidTablesBootstrapDuringIncrementalWithOpenTxnsTimeout() throws Throwable {
+ // Take a dump without ACID tables
+ WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+ dumpWithoutAcidClause);
+ LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+
+ // Open concurrent transactions, create data for incremental and take an incremental dump
+ // with ACID table bootstrap.
+ int numTxns = 5;
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ // Open 5 txns
+ List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+ prepareIncNonAcidData(primaryDbName);
+ prepareIncAcidData(primaryDbName);
+ // Allocate write ids for tables t1 and t2 for all txns
+ // t1=5+2(insert) and t2=5+5(insert, alter add column)
+ Map<String, Long> tables = new HashMap<>();
+ tables.put("t1", numTxns+2L);
+ tables.put("t2", numTxns+5L);
+ allocateWriteIdsForTables(primaryDbName, tables, txnHandler, txns, primaryConf);
+
+ // Bootstrap dump with open txn timeout as 1s.
+ List<String> withConfigs = new LinkedList<>(dumpWithAcidBootstrapClause);
+ withConfigs.add("'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
+ WarehouseInstance.Tuple incDump = primary
+ .run("use " + primaryDbName)
+ .dump(primaryDbName, bootstrapDump.lastReplicationId, withConfigs);
+
+ // After bootstrap dump, all the opened txns should be aborted. Verify it.
+ verifyAllOpenTxnsAborted(txns, primaryConf);
+ verifyNextId(tables, primaryDbName, primaryConf);
+
+ // Incremental load with ACID bootstrap should also replicate the aborted write ids on
+ // tables t1 and t2
+ HiveConf replicaConf = replica.getConf();
+ LOG.info(testName.getMethodName() + ": loading incremental dump with ACID bootstrap.");
+ replica.load(replicatedDbName, incDump.dumpLocation);
+ // During incremental dump with ACID bootstrap we do not dump ALLOC_WRITE_ID events. So the
+ // two ALLOC_WRITE_ID events corresponding aborted transactions on t1 and t2 will not be
+ // repliaced. Discount those.
+ verifyIncLoad(replicatedDbName,
+ (new Long(Long.valueOf(incDump.lastReplicationId) - 2)).toString());
+ // Verify if HWM is properly set after REPL LOAD
+ verifyNextId(tables, replicatedDbName, replicaConf);
// Verify if all the aborted write ids are replicated to the replicated DB
- Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
- 5, TxnDbUtil.countQueryAgent(replicaConf,
- "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
- + "' and t2w_table = 't1'"));
- Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
- 5, TxnDbUtil.countQueryAgent(replicaConf,
- "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
- + "' and t2w_table = 't2'"));
+ for(Map.Entry<String, Long> entry : tables.entrySet()) {
+ entry.setValue((long) numTxns);
+ }
+ verifyWriteIdsForTables(tables, replicaConf, replicatedDbName);
// Verify if entries added in COMPACTION_QUEUE for each table/partition
// t1-> 1 entry and t2-> 2 entries (1 per partition)
- Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
- 1, TxnDbUtil.countQueryAgent(replicaConf,
- "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
- + "' and cq_table = 't1'"));
- Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
- 2, TxnDbUtil.countQueryAgent(replicaConf,
- "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
- + "' and cq_table = 't2'"));
+ tables.clear();
+ tables.put("t1", 1L);
+ tables.put("t2", 4L);
+ verifyCompactionQueue(tables, replicatedDbName, replicaConf);
}
@Test
@@ -397,6 +799,83 @@ public class TestReplicationScenariosAcidTables {
}
@Test
+ public void testBootstrapAcidTablesDuringIncrementalWithConcurrentWrites() throws Throwable {
+ // Dump and load bootstrap without ACID tables.
+ WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+ dumpWithoutAcidClause);
+ LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+
+ // Create incremental data for incremental load with bootstrap of ACID
+ prepareIncNonAcidData(primaryDbName);
+ prepareIncAcidData(primaryDbName);
+ // Perform concurrent writes. Bootstrap won't see the written data but the subsequent
+ // incremental repl should see it. We can not inject callerVerifier since an incremental dump
+ // would not cause an ALTER DATABASE event. Instead we piggy back on
+ // getCurrentNotificationEventId() which is anyway required for a bootstrap.
+ BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId> callerInjectedBehavior
+ = new BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId>() {
+ @Nullable
+ @Override
+ public CurrentNotificationEventId apply(@Nullable CurrentNotificationEventId input) {
+ if (injectionPathCalled) {
+ nonInjectedPathCalled = true;
+ } else {
+ // Do some writes through concurrent thread
+ injectionPathCalled = true;
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Entered new thread");
+ try {
+ prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+ prepareInc2AcidData(primaryDbName, primary.hiveConf);
+ } catch (Throwable t) {
+ Assert.assertNull(t);
+ }
+ LOG.info("Exit new thread success");
+ }
+ });
+ t.start();
+ LOG.info("Created new thread {}", t.getName());
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return input;
+ }
+ };
+
+ InjectableBehaviourObjectStore.setGetCurrentNotificationEventIdBehaviour(callerInjectedBehavior);
+ WarehouseInstance.Tuple incDump = null;
+ try {
+ incDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+ callerInjectedBehavior.assertInjectionsPerformed(true, true);
+ } finally {
+ // reset the behaviour
+ InjectableBehaviourObjectStore.resetGetCurrentNotificationEventIdBehaviour();
+ }
+
+ // While bootstrapping ACID tables it has taken snapshot before concurrent thread performed
+ // write. So concurrent writes won't be dumped.
+ LOG.info(testName.getMethodName() +
+ ": loading incremental dump containing bootstrapped ACID tables.");
+ replica.load(replicatedDbName, incDump.dumpLocation);
+ verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
+
+ // Next Incremental should include the concurrent writes
+ LOG.info(testName.getMethodName() +
+ ": dumping second normal incremental dump from event id = " + incDump.lastReplicationId);
+ WarehouseInstance.Tuple inc2Dump = primary.dump(primaryDbName, incDump.lastReplicationId);
+ LOG.info(testName.getMethodName() +
+ ": loading second normal incremental dump from event id = " + incDump.lastReplicationId);
+ replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+ }
+
+ @Test
public void testAcidTablesBootstrapWithConcurrentDropTable() throws Throwable {
HiveConf primaryConf = primary.getConf();
primary.run("use " + primaryDbName)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index e9a63f8..aeafe85 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -155,7 +155,7 @@ public class WarehouseInstance implements Closeable {
hiveConf.set(entry.getKey(), entry.getValue());
}
- MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true);
+ MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true, true);
// Add the below mentioned dependency in metastore/pom.xml file. For postgres need to copy postgresql-42.2.1.jar to
// .m2//repository/postgresql/postgresql/9.3-1102.jdbc41/postgresql-9.3-1102.jdbc41.jar.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index eb5c18a..11f8bc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -137,9 +137,60 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
Utils.writeOutput(values, new Path(work.resultTempPath), conf);
}
+ /**
+ * Decide whether to examine all the tables to dump. We do this if
+ * 1. External tables are going to be part of the dump : In which case we need to list their
+ * locations.
+ * 2. External or ACID tables are being bootstrapped for the first time : so that we can dump
+ * those tables as a whole.
+ * @return
+ */
+ private boolean shouldDumpExternalTableLocation() {
+ return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+ && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
+ || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES));
+ }
+
+ private boolean shouldExamineTablesToDump() {
+ return shouldDumpExternalTableLocation() ||
+ conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+ }
+
+ private boolean shouldBootstrapDumpTable(Table table) {
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES) &&
+ TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ return true;
+ }
+
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) &&
+ AcidUtils.isTransactionalTable(table)) {
+ return true;
+ }
+
+ return false;
+ }
+
private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception {
Long lastReplId;// get list of events matching dbPattern & tblPattern
// go through each event, and dump out each event to a event-level dump dir inside dumproot
+ String validTxnList = null;
+ long waitUntilTime = 0;
+ long bootDumpBeginReplId = -1;
+
+ // If we are bootstrapping ACID tables, we need to perform steps similar to a regular
+ // bootstrap (See bootstrapDump() for more details. Only difference here is instead of
+ // waiting for the concurrent transactions to finish, we start dumping the incremental events
+ // and wait only for the remaining time if any.
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+ bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
+ assert (bootDumpBeginReplId >= 0);
+ LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {} and table {}",
+ work.dbNameOrPattern,
+ work.tableNameOrPattern);
+ long timeoutInMs = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+ waitUntilTime = System.currentTimeMillis() + timeoutInMs;
+ }
// TODO : instead of simply restricting by message format, we should eventually
// move to a jdbc-driver-stype registering of message format, and picking message
@@ -147,7 +198,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// same factory, restricting by message format is effectively a guard against
// older leftover data that would cause us problems.
- work.overrideEventTo(hiveDb);
+ work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId);
IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern),
@@ -193,27 +244,36 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
dmd.write();
- // If external tables are enabled for replication and
- // - If bootstrap is enabled, then need to combine bootstrap dump of external tables.
- // - If metadata-only dump is enabled, then shall skip dumping external tables data locations to
- // _external_tables_info file. If not metadata-only, then dump the data locations.
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
- && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
- || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES))) {
+ // If required wait more for any transactions open at the time of starting the ACID bootstrap.
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+ assert (waitUntilTime > 0);
+ validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+ }
+
+ // Examine all the tables if required.
+ if (shouldExamineTablesToDump()) {
Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true);
+
try (Writer writer = new Writer(dumpRoot, conf)) {
for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
Table table = hiveDb.getTable(dbName, tableName);
- if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+
+ // Dump external table locations if required.
+ if (shouldDumpExternalTableLocation() &&
+ TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
writer.dataLocationDump(table);
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)) {
- HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
- dumpTable(dbName, tableName, null, dbRoot, 0, hiveDb, tableTuple);
- }
+ }
+
+ // Dump the table to be bootstrapped if required.
+ if (shouldBootstrapDumpTable(table)) {
+ HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
+ dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb,
+ tableTuple);
}
}
}
}
+
return lastReplId;
}
@@ -256,8 +316,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
assert (bootDumpBeginReplId >= 0L);
LOG.info("Bootstrap Dump for db {} and table {}", work.dbNameOrPattern, work.tableNameOrPattern);
+ long timeoutInMs = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+ long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
- String validTxnList = getValidTxnListForReplDump(hiveDb);
+ String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
Database db = hiveDb.getDatabase(dbName);
@@ -397,18 +460,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return openTxns;
}
- String getValidTxnListForReplDump(Hive hiveDb) throws HiveException {
- // Key design point for REPL DUMP is to not have any txns older than current txn in which dump runs.
- // This is needed to ensure that Repl dump doesn't copy any data files written by any open txns
- // mainly for streaming ingest case where one delta file shall have data from committed/aborted/open txns.
- // It may also have data inconsistency if the on-going txns doesn't have corresponding open/write
- // events captured which means, catch-up incremental phase won't be able to replicate those txns.
- // So, the logic is to wait for configured amount of time to see if all open txns < current txn is
- // getting aborted/committed. If not, then we forcefully abort those txns just like AcidHouseKeeperService.
+ // Get list of valid transactions for Repl Dump. Also wait for a given amount of time for the
+ // open transactions to finish. Abort any open transactions after the wait is over.
+ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveException {
+ // Key design point for REPL DUMP is to not have any txns older than current txn in which
+ // dump runs. This is needed to ensure that Repl dump doesn't copy any data files written by
+ // any open txns mainly for streaming ingest case where one delta file shall have data from
+ // committed/aborted/open txns. It may also have data inconsistency if the on-going txns
+ // doesn't have corresponding open/write events captured which means, catch-up incremental
+ // phase won't be able to replicate those txns. So, the logic is to wait for the given amount
+ // of time to see if all open txns < current txn is getting aborted/committed. If not, then
+ // we forcefully abort those txns just like AcidHouseKeeperService.
ValidTxnList validTxnList = getTxnMgr().getValidTxns();
- long timeoutInMs = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
- long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
while (System.currentTimeMillis() < waitUntilTime) {
// If there are no txns which are open for the given ValidTxnList snapshot, then just return it.
if (getOpenTxns(validTxnList).isEmpty()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 61fa424..d32be96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -65,7 +65,22 @@ public class ReplDumpWork implements Serializable {
return maxEventLimit;
}
- void overrideEventTo(Hive fromDb) throws Exception {
+ // Override any user specification that changes the last event to be dumped.
+ void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception {
+ // If we are bootstrapping ACID tables, we need to dump all the events upto the event id at
+ // the beginning of the bootstrap dump and also not dump any event after that. So we override
+ // both, the last event as well as any user specified limit on the number of events. See
+ // bootstrampDump() for more details.
+ if (bootstrapLastId > 0) {
+ eventTo = bootstrapLastId;
+ maxEventLimit = null;
+ LoggerFactory.getLogger(this.getClass())
+ .debug("eventTo restricted to event id : {} because of bootstrap of ACID tables",
+ eventTo);
+ return;
+ }
+
+ // If no last event is specified get the current last from the metastore.
if (eventTo == null) {
eventTo = fromDb.getMSC().getCurrentNotificationEventId().getEventId();
LoggerFactory.getLogger(this.getClass())
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index fbdbbdd..f9f13e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -88,6 +88,10 @@ public class ReplUtils {
// duplicate check. Note : Stmt id is not used for base directory now, but to avoid misuse later, its maintained.
public static final int REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID = 0;
+ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be
+ // seen in production or in case of tests other than the ones where it's required.
+ public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables";
+
/**
* Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
*/
@@ -264,4 +268,13 @@ public class ReplUtils {
}
return Long.parseLong(writeIdString);
}
+
+ // Only for testing, we do not include ACID tables in the dump (and replicate) if config says so.
+ public static boolean includeAcidTableInDump(HiveConf conf) {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+ return conf.getBoolean(REPL_DUMP_INCLUDE_ACID_TABLES, true);
+ }
+
+ return true;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 3cac813..b09ec25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -196,6 +197,17 @@ public class Utils {
}
return shouldReplicateExternalTables;
}
+
+ if (AcidUtils.isTransactionalTable(tableHandle.getTTable())) {
+ if (!ReplUtils.includeAcidTableInDump(hiveConf)) {
+ return false;
+ }
+
+ // Skip dumping events related to ACID tables if bootstrap is enabled on it
+ if (isEventDump) {
+ return !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+ }
+ }
}
return true;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
index 7602d1f..bd25a6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
@@ -35,6 +37,18 @@ class AllocWriteIdHandler extends AbstractEventHandler<AllocWriteIdMessage> {
@Override
public void handle(Context withinContext) throws Exception {
LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), eventMessageAsJSON);
+
+ // If we are bootstrapping ACID table during an incremental dump, the events corresponding to
+ // these ACID tables are not dumped. Hence we do not need to allocate any writeId on the
+ // target and hence we do not need to dump these events.
+ if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+ return;
+ }
+
+ if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) {
+ return;
+ }
+
DumpMetaData dmd = withinContext.createDmd(this);
dmd.setPayload(eventMessageAsJSON);
dmd.write();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 620263f..5c2b3d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -107,12 +108,32 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+ boolean replicatingAcidEvents = true;
+ if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+ // We do not dump ACID table related events when taking a bootstrap dump of ACID tables as
+ // part of an incremental dump. So we shouldn't be dumping any changes to ACID table as
+ // part of the commit. At the same time we need to dump the commit transaction event so
+ // that replication can end a transaction opened when replaying open transaction event.
+ LOG.debug("writeEventsInfoList will be removed from commit message because we are " +
+ "bootstrapping acid tables.");
+ replicatingAcidEvents = false;
+ } else if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) {
+ // Similar to the above condition, only for testing purposes, if the config doesn't allow
+ // ACID tables to be replicated, we don't dump any changes to the ACID tables as part of
+ // commit.
+ LOG.debug("writeEventsInfoList will be removed from commit message because we are " +
+ "not dumping acid tables.");
+ replicatingAcidEvents = false;
+ }
String contextDbName = withinContext.dbName == null ? null :
StringUtils.normalizeIdentifier(withinContext.dbName);
String contextTableName = withinContext.tableName == null ? null :
StringUtils.normalizeIdentifier(withinContext.tableName);
- List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
- getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName);
+ List<WriteEventInfo> writeEventInfoList = null;
+ if (replicatingAcidEvents) {
+ writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
+ getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName);
+ }
int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
if (numEntry != 0) {
eventMessage.addWriteEventInfo(writeEventInfoList);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 39f342f..7a58dba 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -78,7 +78,7 @@ public class TestReplDumpTask {
}
@Override
- String getValidTxnListForReplDump(Hive hiveDb) {
+ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) {
return "";
}
@@ -116,6 +116,8 @@ public class TestReplDumpTask {
when(queryState.getConf()).thenReturn(conf);
when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L);
when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false);
+ when(HiveConf.getVar(conf,
+ HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h");
whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class));
whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class));
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
index efbcfb2..412314c 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
@@ -106,10 +106,11 @@ public class MetaStoreTestUtils {
return MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf);
}
- public static int startMetaStoreWithRetry(Configuration conf, boolean keepJdbcUri)
+ public static int startMetaStoreWithRetry(Configuration conf, boolean keepJdbcUri,
+ boolean keepWarehousePath)
throws Exception {
return MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf,
- keepJdbcUri);
+ keepJdbcUri, keepWarehousePath);
}
public static int startMetaStoreWithRetry() throws Exception {
@@ -119,7 +120,7 @@ public class MetaStoreTestUtils {
public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge,
Configuration conf) throws Exception {
- return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false);
+ return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false, false);
}
/**
@@ -130,20 +131,24 @@ public class MetaStoreTestUtils {
* @param bridge The Thrift bridge to uses
* @param conf The configuration to use
* @param keepJdbcUri If set to true, then the JDBC url is not changed
+ * @param keepWarehousePath If set to true, then the Warehouse directory is not changed
* @return The port on which the MetaStore finally started
* @throws Exception
*/
public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge,
- Configuration conf, boolean keepJdbcUri) throws Exception {
+ Configuration conf, boolean keepJdbcUri, boolean keepWarehousePath) throws Exception {
Exception metaStoreException = null;
String warehouseDir = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE);
for (int tryCount = 0; tryCount < MetaStoreTestUtils.RETRY_COUNT; tryCount++) {
try {
int metaStorePort = findFreePort();
- // Setting metastore instance specific warehouse directory, postfixing with port
- Path postfixedWarehouseDir = new Path(warehouseDir, String.valueOf(metaStorePort));
- MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, postfixedWarehouseDir.toString());
+ if (!keepWarehousePath) {
+ // Setting metastore instance specific warehouse directory, postfixing with port
+ Path postfixedWarehouseDir = new Path(warehouseDir, String.valueOf(metaStorePort));
+ MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, postfixedWarehouseDir.toString());
+ warehouseDir = postfixedWarehouseDir.toString();
+ }
String jdbcUrl = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY);
if (!keepJdbcUri) {
@@ -167,11 +172,11 @@ public class MetaStoreTestUtils {
fs.mkdirs(wh.getWhRoot());
fs.setPermission(wh.getWhRoot(),
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
- LOG.info("MetaStore warehouse root dir ({}) is created", postfixedWarehouseDir);
+ LOG.info("MetaStore warehouse root dir ({}) is created", warehouseDir);
}
LOG.info("MetaStore Thrift Server started on port: {} with warehouse dir: {} with " +
- "jdbcUrl: {}", metaStorePort, postfixedWarehouseDir, jdbcUrl);
+ "jdbcUrl: {}", metaStorePort, warehouseDir, jdbcUrl);
return metaStorePort;
} catch (ConnectException ce) {
metaStoreException = ce;