You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2020/06/10 12:19:32 UTC

[hive] branch master updated: HIVE-23520: REPL: repl dump could add support for immutable dataset (Rajesh Balamohan, reviewed by Aasha Medhi)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a42a329  HIVE-23520: REPL: repl dump could add support for immutable dataset (Rajesh Balamohan, reviewed by Aasha Medhi)
a42a329 is described below

commit a42a3298baff86926a260e244a2a65da859e9b38
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Wed Jun 10 17:49:14 2020 +0530

    HIVE-23520: REPL: repl dump could add support for immutable dataset (Rajesh Balamohan, reviewed by Aasha Medhi)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  3 ++
 .../parse/TestReplicationScenariosAcidTables.java  | 28 +++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java     |  7 ++++
 .../repl/bootstrap/load/table/LoadPartitions.java  | 42 ++++++++++++++++++++++
 4 files changed, 80 insertions(+)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8cdb2eb..24174ae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -478,6 +478,9 @@ public class HiveConf extends Configuration {
     REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false,
         "Indicates whether replication dump only metadata information or data + metadata. \n"
           + "This config makes hive.repl.include.external.tables config ineffective."),
+    REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY("hive.repl.dump.skip.immutable.data.copy", false,
+        "Indicates whether replication dump can skip copyTask and refer to  \n"
+            + " original path instead. This would retain all table and partition meta"),
     REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table",
             false,
             "Indicates whether external table replication dump only metadata information or data + metadata"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 2eab45d..1cdaa8f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -54,6 +55,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Collections;
 import java.util.Map;
@@ -176,6 +178,32 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   }
 
   @Test
+  /**
+   * Testcase for getting immutable dataset dump, and its corresponding repl load.
+   */
+  public void testAcidTablesBootstrapWithMetadataAlone() throws Throwable {
+    List<String> withClauseOptions = new LinkedList<>();
+    withClauseOptions.add("'hive.repl.dump.skip.immutable.data.copy'='true'");
+
+    prepareDataAndDump(primaryDbName, withClauseOptions);
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions);
+    verifyAcidTableLoadWithoutData(replicatedDbName);
+  }
+
+  private void verifyAcidTableLoadWithoutData(String replicatedDbName) throws Throwable {
+    replica.run("use " + replicatedDbName)
+        // no data should be there.
+        .run("select id from t1 order by id")
+        .verifyResults(new String[] {})
+        // all 4 partitions should be there
+        .run("show partitions t2")
+        .verifyResults(new String[] {"country=france", "country=india", "country=italy", "country=us"})
+        // no data should be there.
+        .run("select place from t2")
+        .verifyResults(new String[] {});
+  }
+
+  @Test
   public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable {
     int numTxns = 5;
     HiveConf primaryConf = primary.getConf();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 59cae6b..bccaf94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -164,6 +165,9 @@ public class ReplDumpWork implements Serializable {
   }
 
   public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
+      return Collections.emptyList();
+    }
     List<Task<?>> tasks = new ArrayList<>();
     while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) {
       DirCopyWork dirCopyWork = dirCopyIterator.next();
@@ -176,6 +180,9 @@ public class ReplDumpWork implements Serializable {
   }
 
   public List<Task<?>> managedTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
+      return Collections.emptyList();
+    }
     List<Task<?>> tasks = new ArrayList<>();
     while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
       EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index b78df44..025457b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -61,9 +62,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
 import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
 import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
@@ -159,7 +163,45 @@ public class LoadPartitions {
     );
   }
 
+  private boolean isMetaDataOp() {
+    return HiveConf.getBoolVar(context.hiveConf, REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY) ||
+        HiveConf.getBoolVar(context.hiveConf, REPL_DUMP_METADATA_ONLY);
+  }
+
+  /**
+   * Get all partitions and consolidate them into single partition request.
+   * Also, copy relevant stats and other information from original request.
+   *
+   * @throws SemanticException
+   */
+  private void addConsolidatedPartitionDesc() throws Exception {
+    List<AlterTableAddPartitionDesc.PartitionDesc> partitions = new LinkedList<>();
+    for (AlterTableAddPartitionDesc alterTableAddPartitionDesc : event.partitionDescriptions(tableDesc)) {
+
+      AlterTableAddPartitionDesc.PartitionDesc src = alterTableAddPartitionDesc.getPartitions().get(0);
+
+      partitions.add(new AlterTableAddPartitionDesc.PartitionDesc(
+          src.getPartSpec(), src.getLocation(), src.getPartParams(), src.getInputFormat(),
+          src.getOutputFormat(), src.getNumBuckets(), src.getCols(), src.getSerializationLib(),
+          src.getSerdeParams(), src.getBucketCols(), src.getSortCols(), src.getColStats(),
+          src.getWriteId()));
+    }
+    AlterTableAddPartitionDesc consolidatedPartitionDesc = new AlterTableAddPartitionDesc(tableDesc.getDatabaseName(),
+        tableDesc.getTableName(), true, partitions);
+
+    addPartition(false, consolidatedPartitionDesc, null);
+    if (partitions.size() > 0) {
+      LOG.info("Added {} partitions", partitions.size());
+    }
+  }
+
   private TaskTracker forNewTable() throws Exception {
+    if (isMetaDataOp()) {
+      // Place all partitions in single task to reduce load on HMS.
+      addConsolidatedPartitionDesc();
+      return tracker;
+    }
+
     Iterator<AlterTableAddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
     while (iterator.hasNext() && tracker.canAddMoreTasks()) {
       AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();