You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/06/09 07:41:13 UTC

[hive] branch master updated: HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bd8e4052066 HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346)
bd8e4052066 is described below

commit bd8e4052066e0ea9294defd6d4e87094c667b846
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Thu Jun 9 13:11:05 2022 +0530

    HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346)
---
 .../parse/TestReplicationOptimisedBootstrap.java   | 13 +++++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  5 +--
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     | 50 +++++++++++++++++++---
 3 files changed, 57 insertions(+), 11 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 5ccd74f3708..5bd6ac3d362 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -753,11 +753,13 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
 
     // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for
     // rest.
+
+    assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
     WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause);
 
     String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
     // _bootstrap directory should be created as bootstrap enabled on external tables.
-    Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName);
+    Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/" + EximUtil.METADATA_PATH_NAME +"/" + replicatedDbName);
     FileStatus[] listStatus = dumpPath1.getFileSystem(conf).listStatus(dumpPath1);
     ArrayList<String> tablesBootstrapped = new ArrayList<String>();
     for (FileStatus file : listStatus) {
@@ -769,6 +771,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
     // Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest.
     primary.load(primaryDbName, replicatedDbName, withClause);
 
+    assertFalse("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
+
     primary.run("use " + primaryDbName)
         .run("select id from t1")
         .verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321" })
@@ -898,6 +902,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
 
     // Check the properties on the new target database.
     assertTrue(targetParams.containsKey(TARGET_OF_REPLICATION));
+    assertTrue(targetParams.containsKey(CURR_STATE_ID_TARGET.toString()));
+    assertTrue(targetParams.containsKey(CURR_STATE_ID_SOURCE.toString()));
     assertFalse(targetParams.containsKey(SOURCE_OF_REPLICATION));
 
     // Check the properties on the new source database.
@@ -1096,7 +1102,10 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst
     // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3)
     primary.run("use " + primaryDbName).run("create table tnew_managed (id int)")
         .run("insert into table t1 values (25)").run("insert into table tnew_managed values (110)")
-        .run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3");
+        .run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3")
+        .run("alter database "+ primaryDbName + " set DBPROPERTIES ('key1'='value1')");
+
+    assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
 
     // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental)
     replica.run("use " + replicatedDbName).run("insert into table t1 values (101)")
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index bc141943131..b76354eb459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -245,9 +245,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
               String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive());
               // Get the last replicated event id from the database with respect to target.
               String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive());
-              // Check if the tableDiff directory is present or not.
-              boolean isTableDiffDirectoryPresent =
-                  checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY);
 
               LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId);
               lastReplId =
@@ -274,6 +271,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
               // Generate the bootstrapped table list and put it in the new dump directory for the load to consume.
               createBootstrapTableList(currentDumpPath, tablesForBootstrap, conf);
 
+              dumpDbMetadata(work.dbNameOrPattern, new Path(hiveDumpRoot, EximUtil.METADATA_PATH_NAME),
+                      fromEventId, getHive());
               // Call the normal dump with the tablesForBootstrap set.
               lastReplId =  incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive());
             }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 096fe24face..2ef04e2a306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc;
+import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc;
 import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
 import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
 import org.apache.thrift.TException;
@@ -722,15 +724,15 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       }
       boolean isTableDiffPresent =
           checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY);
-      Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]);
       if (!isTableDiffPresent) {
+        Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]);
         prepareTableDiffFile(eventId, getHive(), work, conf);
-        if (this.childTasks == null) {
-          this.childTasks = new ArrayList<>();
-        }
-        createReplLoadCompleteAckTask();
-        return 0;
       }
+      if (this.childTasks == null) {
+        this.childTasks = new ArrayList<>();
+      }
+      createReplLoadCompleteAckTask();
+      return 0;
     } else if (work.isSecondFailover) {
       // DROP the tables extra on target, which are not on source cluster.
 
@@ -739,6 +741,27 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
         LOG.info("Dropping table {} for optimised bootstarap", work.dbNameToLoadIn + "." + table);
         db.dropTable(work.dbNameToLoadIn + "." + table, true);
       }
+      Database sourceDb = getSourceDbMetadata();  //This sourceDb was the actual target prior to failover.
+      Map<String, String> sourceDbProps = sourceDb.getParameters();
+      Map<String, String> targetDbProps = new HashMap<>(targetDb.getParameters());
+      for (String key : MetaStoreUtils.getReplicationDbProps()) {
+        //Replication Props will be handled separately as part of preAckTask.
+        targetDbProps.remove(key);
+      }
+      for (Map.Entry<String, String> currProp : targetDbProps.entrySet()) {
+        String actualVal = sourceDbProps.get(currProp.getKey());
+        if (!currProp.getValue().equals(actualVal)) {
+          props.put(currProp.getKey(), (actualVal == null) ? "" : actualVal);
+        }
+      }
+      AlterDatabaseSetOwnerDesc alterDbDesc = new AlterDatabaseSetOwnerDesc(sourceDb.getName(),
+              new PrincipalDesc(sourceDb.getOwnerName(), sourceDb.getOwnerType()), null);
+      DDLWork ddlWork = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc, true,
+              (new Path(work.dumpDirectory)).getParent().toString(), work.getMetricCollector());
+      if (this.childTasks == null) {
+        this.childTasks = new ArrayList<>();
+      }
+      this.childTasks.add(TaskFactory.get(ddlWork, conf));
     }
     if (!MetaStoreUtils.isTargetOfReplication(targetDb)) {
       props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE);
@@ -825,4 +848,19 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     LOG.info("REPL_INCREMENTAL_LOAD stage duration : {} ms", currentTimestamp - loadStartTime);
     return 0;
   }
+
+  private Database getSourceDbMetadata() throws IOException, SemanticException {
+    Path dbMetadata = new Path(work.dumpDirectory, EximUtil.METADATA_PATH_NAME);
+    BootstrapEventsIterator itr = new BootstrapEventsIterator(dbMetadata.toString(), work.dbNameToLoadIn,
+            true, conf, work.getMetricCollector());
+    if (!itr.hasNext()) {
+      throw new SemanticException("Unable to find source db metadata in " + dbMetadata.toString());
+    }
+    BootstrapEvent next = itr.next();
+    if (!next.eventType().equals(BootstrapEvent.EventType.Database)) {
+      throw new SemanticException("Invalid eventType: " + next.eventType() + " encountered while fetching " +
+              "source db metadata from " + dbMetadata.toString());
+    }
+    return ((DatabaseEvent) next).dbInMetadata(work.dbNameToLoadIn);
+  }
 }