You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2020/06/10 12:19:32 UTC
[hive] branch master updated: HIVE-23520: REPL: repl dump could add
support for immutable dataset (Rajesh Balamohan, reviewed by Aasha Medhi)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a42a329 HIVE-23520: REPL: repl dump could add support for immutable dataset (Rajesh Balamohan, reviewed by Aasha Medhi)
a42a329 is described below
commit a42a3298baff86926a260e244a2a65da859e9b38
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Wed Jun 10 17:49:14 2020 +0530
HIVE-23520: REPL: repl dump could add support for immutable dataset (Rajesh Balamohan, reviewed by Aasha Medhi)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 ++
.../parse/TestReplicationScenariosAcidTables.java | 28 +++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 7 ++++
.../repl/bootstrap/load/table/LoadPartitions.java | 42 ++++++++++++++++++++++
4 files changed, 80 insertions(+)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8cdb2eb..24174ae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -478,6 +478,9 @@ public class HiveConf extends Configuration {
REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false,
"Indicates whether replication dump only metadata information or data + metadata. \n"
+ "This config makes hive.repl.include.external.tables config ineffective."),
+ REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY("hive.repl.dump.skip.immutable.data.copy", false,
+ "Indicates whether replication dump can skip copyTask and refer to \n"
+ + " original path instead. This would retain all table and partition meta"),
REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table",
false,
"Indicates whether external table replication dump only metadata information or data + metadata"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 2eab45d..1cdaa8f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -54,6 +55,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Collections;
import java.util.Map;
@@ -176,6 +178,32 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
}
@Test
+ /**
+ * Testcase for getting immutable dataset dump, and its corresponding repl load.
+ */
+ public void testAcidTablesBootstrapWithMetadataAlone() throws Throwable {
+ List<String> withClauseOptions = new LinkedList<>();
+ withClauseOptions.add("'hive.repl.dump.skip.immutable.data.copy'='true'");
+
+ prepareDataAndDump(primaryDbName, withClauseOptions);
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions);
+ verifyAcidTableLoadWithoutData(replicatedDbName);
+ }
+
+ private void verifyAcidTableLoadWithoutData(String replicatedDbName) throws Throwable {
+ replica.run("use " + replicatedDbName)
+ // no data should be there.
+ .run("select id from t1 order by id")
+ .verifyResults(new String[] {})
+ // all 4 partitions should be there
+ .run("show partitions t2")
+ .verifyResults(new String[] {"country=france", "country=india", "country=italy", "country=us"})
+ // no data should be there.
+ .run("select place from t2")
+ .verifyResults(new String[] {});
+ }
+
+ @Test
public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable {
int numTxns = 5;
HiveConf primaryConf = primary.getConf();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 59cae6b..bccaf94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -164,6 +165,9 @@ public class ReplDumpWork implements Serializable {
}
public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
+ return Collections.emptyList();
+ }
List<Task<?>> tasks = new ArrayList<>();
while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) {
DirCopyWork dirCopyWork = dirCopyIterator.next();
@@ -176,6 +180,9 @@ public class ReplDumpWork implements Serializable {
}
public List<Task<?>> managedTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
+ return Collections.emptyList();
+ }
List<Task<?>> tasks = new ArrayList<>();
while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index b78df44..025457b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -61,9 +62,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
@@ -159,7 +163,45 @@ public class LoadPartitions {
);
}
+ private boolean isMetaDataOp() {
+ return HiveConf.getBoolVar(context.hiveConf, REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY) ||
+ HiveConf.getBoolVar(context.hiveConf, REPL_DUMP_METADATA_ONLY);
+ }
+
+ /**
+ * Get all partitions and consolidate them into single partition request.
+ * Also, copy relevant stats and other information from original request.
+ *
+ * @throws SemanticException
+ */
+ private void addConsolidatedPartitionDesc() throws Exception {
+ List<AlterTableAddPartitionDesc.PartitionDesc> partitions = new LinkedList<>();
+ for (AlterTableAddPartitionDesc alterTableAddPartitionDesc : event.partitionDescriptions(tableDesc)) {
+
+ AlterTableAddPartitionDesc.PartitionDesc src = alterTableAddPartitionDesc.getPartitions().get(0);
+
+ partitions.add(new AlterTableAddPartitionDesc.PartitionDesc(
+ src.getPartSpec(), src.getLocation(), src.getPartParams(), src.getInputFormat(),
+ src.getOutputFormat(), src.getNumBuckets(), src.getCols(), src.getSerializationLib(),
+ src.getSerdeParams(), src.getBucketCols(), src.getSortCols(), src.getColStats(),
+ src.getWriteId()));
+ }
+ AlterTableAddPartitionDesc consolidatedPartitionDesc = new AlterTableAddPartitionDesc(tableDesc.getDatabaseName(),
+ tableDesc.getTableName(), true, partitions);
+
+ addPartition(false, consolidatedPartitionDesc, null);
+ if (partitions.size() > 0) {
+ LOG.info("Added {} partitions", partitions.size());
+ }
+ }
+
private TaskTracker forNewTable() throws Exception {
+ if (isMetaDataOp()) {
+ // Place all partitions in single task to reduce load on HMS.
+ addConsolidatedPartitionDesc();
+ return tracker;
+ }
+
Iterator<AlterTableAddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
while (iterator.hasNext() && tracker.canAddMoreTasks()) {
AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();