You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/07/20 04:24:08 UTC

[hive] branch master updated: HIVE-23474: Deny Repl Dump if the database is a target of replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 63dd09c  HIVE-23474: Deny Repl Dump if the database is a target of replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
63dd09c is described below

commit 63dd09c941c9372e9fb30a50d5567d4a86255a2a
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Jul 20 09:53:56 2020 +0530

    HIVE-23474: Deny Repl Dump if the database is a target of replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
 .../java/org/apache/hadoop/hive/ql/ErrorMsg.java   |  3 ++-
 .../parse/TestReplicationScenariosAcidTables.java  | 22 ++++++++++++++++++++++
 .../TestReplicationScenariosAcrossInstances.java   |  4 +++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |  6 ++++++
 .../ql/exec/repl/bootstrap/load/LoadDatabase.java  |  3 +++
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   | 12 ++++++++++++
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |  4 ++++
 7 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 4b63653..d943412 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -615,7 +615,8 @@ public enum ErrorMsg {
   //========================== 40000 range starts here ========================//
 
   SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true),
-  SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true)
+  SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true),
+  REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.")
   ;
 
   private int errorCode;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 71326ec..bf8a00d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -1895,6 +1895,28 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
   }
 
+  @Test
+  public void testReplTargetOfReplication() throws Throwable {
+    // Bootstrap
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
+    replica.load(replicatedDbName, primaryDbName).verifyReplTargetProperty(replicatedDbName);
+    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
+
+    //Try to do a dump on replicated db. It should fail
+    replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='1')");
+    try {
+      replica.dump(replicatedDbName);
+    } catch (Exception e) {
+      Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage());
+    }
+    replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')");
+
+    //Try to dump a different db on replica. That should succeed
+    replica.run("create database " + replicatedDbName + "_extra with dbproperties ('repl.source.for' = '1, 2, 3')")
+      .dump(replicatedDbName + "_extra");
+    replica.run("drop database if exists " + replicatedDbName + "_extra cascade");
+  }
+
   private void verifyPathExist(FileSystem fs, Path filePath) throws IOException {
     assertTrue("Path not found:" + filePath, fs.exists(filePath));
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index d7b360c..60074ae 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -63,6 +63,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -969,7 +970,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     replica.load(replicatedDbName, primaryDbName); // first successful incremental load.
 
     // Bootstrap Repl B -> C
-    WarehouseInstance.Tuple tupleReplica = replica.dump(replicatedDbName);
+    WarehouseInstance.Tuple tupleReplica = replica.run("alter database " + replicatedDbName
+      + " set dbproperties ('" + TARGET_OF_REPLICATION + "' = '')").dump(replicatedDbName);
     String replDbFromReplica = replicatedDbName + "_dupe";
     replica.load(replDbFromReplica, replicatedDbName)
             .run("use " + replDbFromReplica)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 498d59c..0a7d5a0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -468,6 +468,12 @@ public class WarehouseInstance implements Closeable {
     assertTrue(props.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY));
   }
 
+  public void verifyTargetOfReplProperty(String dbName) throws Exception {
+    Database db = getDatabase(dbName);
+    assertTrue(db.getParameters().containsKey(ReplUtils.TARGET_OF_REPLICATION));
+    assertTrue(Boolean.getBoolean(db.getParameters().get(ReplUtils.TARGET_OF_REPLICATION)));
+  }
+
   public WarehouseInstance verifyReplTargetProperty(String dbName, List<String> tblNames) throws Exception {
     for (String tblName : tblNames) {
       verifyReplTargetProperty(getTable(dbName, tblName).getParameters());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 85e9add..1444e15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -156,6 +156,9 @@ public class LoadDatabase {
     // done for this database or not. If compaction is done before first incremental then duplicate check will fail as
     // compaction may change the directory structure.
     parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true");
+    //This flag will be set to identify its a target of replication. Repl dump won't be allowed on a database
+    //which is a target of replication.
+    parameters.put(ReplUtils.TARGET_OF_REPLICATION, "true");
 
     return parameters;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index bccf56a..eaa6690 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
@@ -130,6 +131,8 @@ public class ReplUtils {
   public static final String RANGER_HIVE_SERVICE_NAME = "ranger.plugin.hive.service.name";
 
   public static final String RANGER_CONFIGURATION_RESOURCE_NAME = "ranger-hive-security.xml";
+
+  public static final String TARGET_OF_REPLICATION = "repl.target.for";
   /**
    * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
    */
@@ -210,6 +213,15 @@ public class ReplUtils {
     return false;
   }
 
+  public static boolean isTargetOfReplication(Database db) {
+    assert (db != null);
+    Map<String, String> m = db.getParameters();
+    if ((m != null) && (m.containsKey(TARGET_OF_REPLICATION))) {
+      return !StringUtils.isEmpty(m.get(TARGET_OF_REPLICATION));
+    }
+    return false;
+  }
+
   public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat)
           throws SemanticException {
     String val = hiveConf.get(configParam);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index ed358f3..5e3f3a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -219,6 +219,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
                   " as it is not a source of replication (repl.source.for)");
           throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getMsg());
         }
+        if (ReplUtils.isTargetOfReplication(database)) {
+          LOG.error("Cannot dump database " + dbNameOrPattern + " as it is a target of replication (repl.target.for)");
+          throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg());
+        }
       } else {
         throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist");
       }