You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/06/09 07:41:13 UTC
[hive] branch master updated: HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bd8e4052066 HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346)
bd8e4052066 is described below
commit bd8e4052066e0ea9294defd6d4e87094c667b846
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Thu Jun 9 13:11:05 2022 +0530
HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346)
---
.../parse/TestReplicationOptimisedBootstrap.java | 13 +++++-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +--
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 50 +++++++++++++++++++---
3 files changed, 57 insertions(+), 11 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 5ccd74f3708..5bd6ac3d362 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -753,11 +753,13 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
// Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
// rest.
+
+ assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause);
String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
// _bootstrap directory should be created as bootstrap enabled on external tables.
- Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
+ Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/" + EximUtil.METADATA_PATH_NAME +"/" + replicatedDbName);
FileStatus[] listStatus = dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
ArrayList<String> tablesBootstrapped = new ArrayList<String>();
for (FileStatus file : listStatus) {
@@ -769,6 +771,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
// Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
primary.load(primaryDbName, replicatedDbName, withClause);
+ assertFalse("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
+
primary.run("use " + primaryDbName)
.run("select id from t1")
.verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321" })
@@ -898,6 +902,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
// Check the properties on the new target database.
assertTrue(targetParams.containsKey(TARGET_OF_REPLICATION));
+ assertTrue(targetParams.containsKey(CURR_STATE_ID_TARGET.toString()));
+ assertTrue(targetParams.containsKey(CURR_STATE_ID_SOURCE.toString()));
assertFalse(targetParams.containsKey(SOURCE_OF_REPLICATION));
// Check the properties on the new source database.
@@ -1096,7 +1102,10 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
// Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
primary.run("use " + primaryDbName).run("create table tnew_managed (id int)")
.run("insert into table t1 values (25)").run("insert into table tnew_managed values (110)")
- .run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3");
+ .run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3")
+ .run("alter database "+ primaryDbName + " set DBPROPERTIES ('key1'='value1')");
+
+ assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
// Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
replica.run("use " + replicatedDbName).run("insert into table t1 values (101)")
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index bc141943131..b76354eb459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -245,9 +245,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive());
// Get the last replicated event id from the database with respect to target.
String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive());
- // Check if the tableDiff directory is present or not.
- boolean isTableDiffDirectoryPresent =
- checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY);
LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
lastReplId =
@@ -274,6 +271,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// Generate the bootstrapped table list and put it in the new dump directory for the load to consume.
createBootstrapTableList(currentDumpPath, tablesForBootstrap, conf);
+ dumpDbMetadata(work.dbNameOrPattern, new Path(hiveDumpRoot, EximUtil.METADATA_PATH_NAME),
+ fromEventId, getHive());
// Call the normal dump with the tablesForBootstrap set.
lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 096fe24face..2ef04e2a306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc;
+import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
import org.apache.thrift.TException;
@@ -722,15 +724,15 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
boolean isTableDiffPresent =
checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
- Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]);
if (!isTableDiffPresent) {
+ Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]);
prepareTableDiffFile(eventId, getHive(), work, conf);
- if (this.childTasks == null) {
- this.childTasks = new ArrayList<>();
- }
- createReplLoadCompleteAckTask();
- return 0;
}
+ if (this.childTasks == null) {
+ this.childTasks = new ArrayList<>();
+ }
+ createReplLoadCompleteAckTask();
+ return 0;
} else if (work.isSecondFailover) {
// DROP the tables extra on target, which are not on source cluster.
@@ -739,6 +741,27 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
LOG.info("Dropping table {} for optimised bootstarap", work.dbNameToLoadIn + "." + table);
db.dropTable(work.dbNameToLoadIn + "." + table, true);
}
+ Database sourceDb = getSourceDbMetadata(); //This sourceDb was the actual target prior to failover.
+ Map<String, String> sourceDbProps = sourceDb.getParameters();
+ Map<String, String> targetDbProps = new HashMap<>(targetDb.getParameters());
+ for (String key : MetaStoreUtils.getReplicationDbProps()) {
+ //Replication Props will be handled separately as part of preAckTask.
+ targetDbProps.remove(key);
+ }
+ for (Map.Entry<String, String> currProp : targetDbProps.entrySet()) {
+ String actualVal = sourceDbProps.get(currProp.getKey());
+ if (!currProp.getValue().equals(actualVal)) {
+ props.put(currProp.getKey(), (actualVal == null) ? "" : actualVal);
+ }
+ }
+ AlterDatabaseSetOwnerDesc alterDbDesc = new AlterDatabaseSetOwnerDesc(sourceDb.getName(),
+ new PrincipalDesc(sourceDb.getOwnerName(), sourceDb.getOwnerType()), null);
+ DDLWork ddlWork = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc, true,
+ (new Path(work.dumpDirectory)).getParent().toString(), work.getMetricCollector());
+ if (this.childTasks == null) {
+ this.childTasks = new ArrayList<>();
+ }
+ this.childTasks.add(TaskFactory.get(ddlWork, conf));
}
if (!MetaStoreUtils.isTargetOfReplication(targetDb)) {
props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE);
@@ -825,4 +848,19 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
LOG.info("REPL_INCREMENTAL_LOAD stage duration : {} ms", currentTimestamp - loadStartTime);
return 0;
}
+
+ private Database getSourceDbMetadata() throws IOException, SemanticException {
+ Path dbMetadata = new Path(work.dumpDirectory, EximUtil.METADATA_PATH_NAME);
+ BootstrapEventsIterator itr = new BootstrapEventsIterator(dbMetadata.toString(), work.dbNameToLoadIn,
+ true, conf, work.getMetricCollector());
+ if (!itr.hasNext()) {
+ throw new SemanticException("Unable to find source db metadata in " + dbMetadata.toString());
+ }
+ BootstrapEvent next = itr.next();
+ if (!next.eventType().equals(BootstrapEvent.EventType.Database)) {
+ throw new SemanticException("Invalid eventType: " + next.eventType() + " encountered while fetching " +
+ "source db metadata from " + dbMetadata.toString());
+ }
+ return ((DatabaseEvent) next).dbInMetadata(work.dbNameToLoadIn);
+ }
}