You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/10/23 03:39:07 UTC
[hive] branch master updated: HIVE-24109: Load partitions in
batches for managed tables in the bootstrap phase (Aasha Medhi,
reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e033ebe HIVE-24109: Load partitions in batches for managed tables in the bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha)
e033ebe is described below
commit e033ebe3e37357244fe35d3ff9f080742b4e1b5f
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Fri Oct 23 09:08:53 2020 +0530
HIVE-24109: Load partitions in batches for managed tables in the bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 16 +-
.../hive/ql/parse/TestReplicationScenarios.java | 10 +-
.../parse/TestReplicationScenariosAcidTables.java | 266 +++++++++++----------
.../TestReplicationScenariosAcrossInstances.java | 64 +++--
.../partition/add/AlterTableAddPartitionDesc.java | 7 +
.../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 81 ++-----
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 2 +
.../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 17 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 3 +-
.../exec/repl/bootstrap/events/PartitionEvent.java | 5 +
.../events/filesystem/FSPartitionEvent.java | 12 +
.../exec/repl/bootstrap/load/ReplicationState.java | 18 ++
.../repl/bootstrap/load/table/LoadPartitions.java | 228 ++++++++----------
.../exec/repl/bootstrap/load/table/LoadTable.java | 6 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 48 ++--
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 39 ---
.../TestPrimaryToReplicaResourceFunction.java | 2 +-
.../metastore/InjectableBehaviourObjectStore.java | 23 ++
18 files changed, 411 insertions(+), 436 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index aab4913..edaa75b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -596,18 +596,6 @@ public class HiveConf extends Configuration {
+ "'hive.repl.include.external.tables' when sets to true. If 'hive.repl.include.external.tables' is \n"
+ "set to false, then this config parameter has no effect. It should be set to true only once for \n"
+ "incremental repl dump on each existing replication policy after enabling external tables replication."),
- REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false,
- "If its set to true, REPL LOAD copies data files directly to the target table/partition location \n"
- + "instead of copying to staging directory first and then move to target location. This optimizes \n"
- + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n"
- + " files are costly operations. In file system like HDFS where move operation is atomic, this \n"
- + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."),
- REPL_MOVE_OPTIMIZED_FILE_SCHEMES("hive.repl.move.optimized.scheme", "s3a, wasb",
- "Comma separated list of schemes for which move optimization will be enabled during repl load. \n"
- + "This configuration overrides the value set using REPL_ENABLE_MOVE_OPTIMIZATION for the given schemes. \n"
- + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n "
- + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n"
- + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."),
REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", null,
"This is the fully qualified base directory on the target/replica warehouse under which data for "
+ "external tables is stored. This is relative base path and hence prefixed to the source "
@@ -655,6 +643,10 @@ public class HiveConf extends Configuration {
"Provide the maximum number of partitions of a table that will be batched together during \n"
+ "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n"
+ "The data for these partitions will be copied before copying the metadata batch. "),
+ REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE("hive.repl.load.partitions.with.data.copy.batch.size",1000,
+ "Provide the maximum number of partitions of a table that will be batched together during \n"
+ + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n"
+ + "The data for these partitions will be copied before copying the metadata batch. "),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index bef0a95..27e97b9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -475,7 +475,6 @@ public class TestReplicationScenarios {
private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIncrementalDump,
Tuple tuple) throws Throwable {
HiveConf confTemp = new HiveConf();
- confTemp.set("hive.repl.enable.move.optimization", "true");
Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
ReplicationMetricCollector metricCollector;
if (isIncrementalDump) {
@@ -4022,8 +4021,7 @@ public class TestReplicationScenarios {
String replDbName = dbName + "_replica";
Tuple dump = replDumpDb(dbName);
- run("REPL LOAD " + dbName + " INTO " + replDbName +
- " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror);
run(" use " + replDbName, driverMirror);
@@ -4056,8 +4054,7 @@ public class TestReplicationScenarios {
verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver);
Tuple incrementalDump = replDumpDb(dbName);
- run("REPL LOAD " + dbName + " INTO " + replDbName +
- " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
@@ -4073,8 +4070,7 @@ public class TestReplicationScenarios {
verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver);
incrementalDump = replDumpDb(dbName);
- run("REPL LOAD " + dbName + " INTO " + replDbName +
- " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 575eeab..3d22770 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -61,6 +61,7 @@ import java.util.List;
import java.util.Collections;
import java.util.Map;
+
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.junit.Assert.assertEquals;
@@ -161,25 +162,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
}
@Test
- public void testAcidTablesMoveOptimizationBootStrap() throws Throwable {
- WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, primaryDbName,
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
- verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
- }
-
- @Test
- public void testAcidTablesMoveOptimizationIncremental() throws Throwable {
- WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName,
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
- WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, primaryDbName,
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
- verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true);
- }
-
- @Test
/**
* Testcase for getting immutable dataset dump, and its corresponding repl load.
*/
@@ -2301,131 +2283,155 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable {
//Distcp copy
List<String> withClause = Arrays.asList(
- "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'",
- "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
- "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
- "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
- "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
- + UserGroupInformation.getCurrentUser().getUserName() + "'");
+ "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+ "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+ "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ + UserGroupInformation.getCurrentUser().getUserName() + "'");
WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
- .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
- " stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
- " stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
- " stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
- " stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
- .run("insert into t1 values (1)")
- .run("insert into t1 values (11)")
- .run("insert into t2 values (2)")
- .run("insert into t2 values (22)")
- .run("insert into t3 values (33)")
- .run("insert into tpart1 partition(name='Tom') values(100)")
- .run("insert into tpart1 partition(name='Jerry') values(101)")
- .run("insert into tpart2 partition(name='Bob') values(200)")
- .run("insert into tpart2 partition(name='Carl') values(201)")
- .run("insert into text1 values ('ricky')")
- .dump(primaryDbName, withClause);
+ .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (11)")
+ .run("insert into t2 values (2)")
+ .run("insert into t2 values (22)")
+ .run("insert into t3 values (33)")
+ .run("insert into tpart1 partition(name='Tom') values(100)")
+ .run("insert into tpart1 partition(name='Jerry') values(101)")
+ .run("insert into tpart2 partition(name='Bob') values(200)")
+ .run("insert into tpart2 partition(name='Carl') values(201)")
+ .run("insert into text1 values ('ricky')")
+ .dump(primaryDbName, withClause);
replica.run("DROP TABLE t3");
replica.load(replicatedDbName, primaryDbName, withClause)
- .run("use " + replicatedDbName)
- .run("show tables")
- .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"})
- .run("select * from " + replicatedDbName + ".t1")
- .verifyResults(new String[] {"1", "11"})
- .run("select * from " + replicatedDbName + ".t2")
- .verifyResults(new String[]{"2", "22"})
- .run("select a from " + replicatedDbName + ".tpart1")
- .verifyResults(new String[]{"100", "101"})
- .run("show partitions " + replicatedDbName + ".tpart1")
- .verifyResults(new String[]{"name=Tom", "name=Jerry"})
- .run("select a from " + replicatedDbName + ".tpart2")
- .verifyResults(new String[]{"200", "201"})
- .run("show partitions " + replicatedDbName + ".tpart2")
- .verifyResults(new String[]{"name=Bob", "name=Carl"})
- .run("select a from " + replicatedDbName + ".text1")
- .verifyResults(new String[]{"ricky"});
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"})
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[]{"1", "11"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"100", "101"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+ .run("select a from " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"200", "201"})
+ .run("show partitions " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"name=Bob", "name=Carl"})
+ .run("select a from " + replicatedDbName + ".text1")
+ .verifyResults(new String[]{"ricky"});
WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
- .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
- " stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
- " stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
- " stored as orc TBLPROPERTIES ('transactional'='true')")
- .run("insert into t1 values (111)")
- .run("insert into t2 values (222)")
- .run("insert into t4 values (4)")
- .run("insert into tpart1 partition(name='Tom') values(102)")
- .run("insert into tpart1 partition(name='Jerry') values(103)")
- .run("insert into tpart2 partition(name='Bob') values(202)")
- .run("insert into tpart2 partition(name='Carl') values(203)")
- .run("insert into tpart3 partition(name='Tom3') values(300)")
- .run("insert into tpart3 partition(name='Jerry3') values(301)")
- .run("insert into tpart4 partition(name='Bob4') values(400)")
- .run("insert into tpart4 partition(name='Carl4') values(401)")
- .run("insert into text1 values ('martin')")
- .dump(primaryDbName, withClause);
+ .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("insert into t1 values (111)")
+ .run("insert into t2 values (222)")
+ .run("insert into t4 values (4)")
+ .run("insert into tpart1 partition(name='Tom') values(102)")
+ .run("insert into tpart1 partition(name='Jerry') values(103)")
+ .run("insert into tpart2 partition(name='Bob') values(202)")
+ .run("insert into tpart2 partition(name='Carl') values(203)")
+ .run("insert into tpart3 partition(name='Tom3') values(300)")
+ .run("insert into tpart3 partition(name='Jerry3') values(301)")
+ .run("insert into tpart4 partition(name='Bob4') values(400)")
+ .run("insert into tpart4 partition(name='Carl4') values(401)")
+ .run("insert into text1 values ('martin')")
+ .dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause)
- .run("use " + replicatedDbName)
- .run("show tables ")
- .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"})
- .run("select * from " + replicatedDbName + ".t1")
- .verifyResults(new String[] {"1", "11", "111"})
- .run("select * from " + replicatedDbName + ".t2")
- .verifyResults(new String[]{"2", "22", "222"})
- .run("select * from " + replicatedDbName + ".t4")
- .verifyResults(new String[]{"4"})
- .run("select a from " + replicatedDbName + ".tpart1")
- .verifyResults(new String[]{"100", "101", "102", "103"})
- .run("show partitions " + replicatedDbName + ".tpart1")
- .verifyResults(new String[]{"name=Tom", "name=Jerry"})
- .run("select a from " + replicatedDbName + ".tpart2")
- .verifyResults(new String[]{"200", "201", "202", "203"})
- .run("show partitions " + replicatedDbName + ".tpart2")
- .verifyResults(new String[]{"name=Bob", "name=Carl"})
- .run("select a from " + replicatedDbName + ".tpart3")
- .verifyResults(new String[]{"300", "301"})
- .run("show partitions " + replicatedDbName + ".tpart3")
- .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
- .run("select a from " + replicatedDbName + ".tpart4")
- .verifyResults(new String[]{"400", "401"})
- .run("show partitions " + replicatedDbName + ".tpart4")
- .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
- .run("select a from " + replicatedDbName + ".text1")
- .verifyResults(new String[]{"ricky", "martin"});
+ .run("use " + replicatedDbName)
+ .run("show tables ")
+ .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"})
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[]{"1", "11", "111"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22", "222"})
+ .run("select * from " + replicatedDbName + ".t4")
+ .verifyResults(new String[]{"4"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"100", "101", "102", "103"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+ .run("select a from " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"200", "201", "202", "203"})
+ .run("show partitions " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"name=Bob", "name=Carl"})
+ .run("select a from " + replicatedDbName + ".tpart3")
+ .verifyResults(new String[]{"300", "301"})
+ .run("show partitions " + replicatedDbName + ".tpart3")
+ .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
+ .run("select a from " + replicatedDbName + ".tpart4")
+ .verifyResults(new String[]{"400", "401"})
+ .run("show partitions " + replicatedDbName + ".tpart4")
+ .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
+ .run("select a from " + replicatedDbName + ".text1")
+ .verifyResults(new String[]{"ricky", "martin"});
incrementalDump = primary.run("use " + primaryDbName)
- .run("insert into t4 values (44)")
- .run("insert into t1 values (1111)")
- .run("DROP TABLE t1")
- .run("insert into t2 values (2222)")
- .run("insert into tpart1 partition(name='Tom') values(104)")
- .run("insert into tpart1 partition(name='Tom_del') values(1000)")
- .run("insert into tpart1 partition(name='Harry') values(10001)")
- .run("insert into tpart1 partition(name='Jerry') values(105)")
- .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
- .run("DROP TABLE tpart2")
- .dump(primaryDbName, withClause);
+ .run("insert into t4 values (44)")
+ .run("insert into t1 values (1111)")
+ .run("DROP TABLE t1")
+ .run("insert into t2 values (2222)")
+ .run("insert into tpart1 partition(name='Tom') values(104)")
+ .run("insert into tpart1 partition(name='Tom_del') values(1000)")
+ .run("insert into tpart1 partition(name='Harry') values(10001)")
+ .run("insert into tpart1 partition(name='Jerry') values(105)")
+ .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
+ .run("DROP TABLE tpart2")
+ .dump(primaryDbName, withClause);
replica.run("DROP TABLE t4")
- .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
+ .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
replica.load(replicatedDbName, primaryDbName, withClause)
- .run("use " + replicatedDbName)
- .run("show tables ")
- .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"})
- .run("select * from " + replicatedDbName + ".t2")
- .verifyResults(new String[]{"2", "22", "222", "2222"})
- .run("select a from " + replicatedDbName + ".tpart1")
- .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
- .run("show partitions " + replicatedDbName + ".tpart1")
- .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"});
+ .run("use " + replicatedDbName)
+ .run("show tables ")
+ .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22", "222", "2222"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"});
+ }
+
+ @Test
+ public void testTableWithPartitionsInBatch() throws Throwable {
+
+ List<String> withClause = new ArrayList<>();
+ withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE.varname + "'='" + 1 + "'");
+
+ primary.run("use " + primaryDbName)
+ .run("create table t2 (place string) partitioned by (country string)")
+ .run("insert into t2 partition(country='india') values ('bangalore')")
+ .run("insert into t2 partition(country='france') values ('paris')")
+ .run("insert into t2 partition(country='australia') values ('sydney')")
+ .dump(primaryDbName, withClause);
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't2'")
+ .verifyResults(new String[] { "t2" })
+ .run("select distinct(country) from t2")
+ .verifyResults(new String[] { "india", "france", "australia" })
+ .run("select place from t2")
+ .verifyResults(new String[] { "bangalore", "paris", "sydney" })
+ .verifyReplTargetProperty(replicatedDbName);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index dbdee9d..e01c6c4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -453,6 +453,34 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
}
@Test
+ public void testMultipleStagesOfReplicationLoadTaskWithPartitionBatching() throws Throwable {
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into t1 values (1), (2)")
+ .run("create table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('bangalore')")
+ .run("insert into table t2 partition(country='us') values ('austin')")
+ .run("insert into table t2 partition(country='france') values ('paris')")
+ .run("create table t3 (rank int)")
+ .dump(primaryDbName);
+
+ // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs.
+ List<String> withClause = new ArrayList<>();
+ withClause.add("'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
+ withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE.varname + "'='1'");
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] { "t1", "t2", "t3" })
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("select country from t2 order by country")
+ .verifyResults(new String[] { "france", "india", "us" });
+ }
+
+ @Test
public void testParallelExecutionOfReplicationBootStrapLoad() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
@@ -1337,12 +1365,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk".
// So, table "t2" will exist and partition "india" will exist, rest failed as operation failed.
- BehaviourInjection<List<Partition>, Boolean> alterPartitionStub
+ BehaviourInjection<List<Partition>, Boolean> addPartitionStub
= new BehaviourInjection<List<Partition>, Boolean>() {
@Override
public Boolean apply(List<Partition> ptns) {
for (Partition ptn : ptns) {
- if (ptn.getValues().get(0).equals("india")) {
+ if (ptn.getValues().get(0).equals("uk")) {
injectionPathCalled = true;
LOG.warn("####getPartition Stub called");
return false;
@@ -1351,14 +1379,15 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
return true;
}
};
- InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(alterPartitionStub);
+ InjectableBehaviourObjectStore.setAddPartitionsBehaviour(addPartitionStub);
// Make sure that there's some order in which the objects are loaded.
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
- "'hive.in.repl.test.files.sorted'='true'");
+ "'hive.in.repl.test.files.sorted'='true'",
+ "'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE + "' = '1'");
replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
- InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(null); // reset the behaviour
- alterPartitionStub.assertInjectionsPerformed(true, false);
+ InjectableBehaviourObjectStore.resetAddPartitionModifier(); // reset the behaviour
+ addPartitionStub.assertInjectionsPerformed(true, false);
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
@@ -1420,21 +1449,19 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
@Test
public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable {
- List<String> withConfigs =
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
String replicatedDbName_CM = replicatedDbName + "_CM";
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("create table t1 (place string) partitioned by (country string)")
.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName, withConfigs);
+ replica.load(replicatedDbName, primaryDbName);
//delete load ack to reuse the dump
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
+ LOAD_ACKNOWLEDGEMENT.toString()), true);
- replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
+ replica.load(replicatedDbName_CM, primaryDbName);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
@@ -1448,18 +1475,16 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
@Test
public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable {
- List<String> withConfigs =
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
String replicatedDbName_CM = replicatedDbName + "_CM";
WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
.run("ALTER TABLE t2 ADD PARTITION (country='india')")
.dump(primaryDbName);
- replica.load(replicatedDbName, primaryDbName, withConfigs);
+ replica.load(replicatedDbName, primaryDbName);
//delete load ack to reuse the dump
new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true);
- replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
+ replica.load(replicatedDbName_CM, primaryDbName);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
@@ -1472,9 +1497,6 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM,
String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable {
- List<String> withConfigs =
- Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
-
// fail add notification for given event type.
BehaviourInjection<NotificationEvent, Boolean> callerVerifier
= new BehaviourInjection<NotificationEvent, Boolean>() {
@@ -1493,13 +1515,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
- replica.loadFailure(replicaDb, primaryDbName, withConfigs);
+ replica.loadFailure(replicaDb, primaryDbName);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
callerVerifier.assertInjectionsPerformed(true, false);
- replica.load(replicaDb, primaryDbName, withConfigs);
+ replica.load(replicaDb, primaryDbName);
replica.run("use " + replicaDb)
.run("select country from " + tbl + " where country == 'india'")
@@ -1516,13 +1538,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs);
+ replica.loadFailure(replicatedDbName_CM, primaryDbName);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
callerVerifier.assertInjectionsPerformed(true, false);
- replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
+ replica.load(replicatedDbName_CM, primaryDbName);
replica.run("use " + replicatedDbName_CM)
.run("select country from " + tbl + " where country == 'india'")
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java
index c4b2dab..d61c575 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.ddl.table.partition.add;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -105,6 +106,12 @@ public class AlterTableAddPartitionDesc implements DDLDescWithWriteId, Serializa
return params;
}
+ public void addPartParams(Map<String, String> partParams) {
+ if (params != null) {
+ params.putAll(partParams);
+ }
+ }
+
@Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public String getPartParamsForExplain() {
return params.toString();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 3b07b73..57ce1aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -33,10 +32,8 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
-import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
-import java.util.ListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +47,6 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
@@ -142,7 +138,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size());
- // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if
+ // in case of acid tables, file is directly copied to destination. So we need to clear the old content, if
// its a replace (insert overwrite ) operation.
if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) {
LOG.debug(" path " + toPath + " is cleaned before renaming");
@@ -226,52 +222,17 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf, boolean isAutoPurge, boolean needRecycle,
- boolean readSourceAsFileList) {
- return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle,
- readSourceAsFileList, false);
- }
-
- public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
- HiveConf conf, boolean isAutoPurge, boolean needRecycle,
boolean readSourceAsFileList, String dumpDirectory,
ReplicationMetricCollector metricCollector) {
return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle,
- readSourceAsFileList, false, dumpDirectory, metricCollector);
+ readSourceAsFileList, false, true, dumpDirectory, metricCollector);
}
- private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
- HiveConf conf, boolean isAutoPurge, boolean needRecycle,
- boolean readSourceAsFileList,
- boolean overWrite) {
- Task<?> copyTask = null;
- LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath);
- if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
- ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite);
- rcwork.setReadSrcAsFilesList(readSourceAsFileList);
- if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) {
- rcwork.setDeleteDestIfExist(true);
- rcwork.setAutoPurge(isAutoPurge);
- rcwork.setNeedRecycle(needRecycle);
- }
- // For replace case, duplicate check should not be done. The new base directory will automatically make the older
- // data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple
- // replace events getting replayed in the first incremental load.
- rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace());
- LOG.debug("ReplCopyTask:\trcwork");
- String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
- rcwork.setDistCpDoAsUser(distCpDoAsUser);
- copyTask = TaskFactory.get(rcwork, conf);
- } else {
- LOG.debug("ReplCopyTask:\tcwork");
- copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
- }
- return copyTask;
- }
private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf, boolean isAutoPurge, boolean needRecycle,
boolean readSourceAsFileList,
- boolean overWrite,
+ boolean overWrite, boolean deleteDestination,
String dumpDirectory,
ReplicationMetricCollector metricCollector) {
Task<?> copyTask = null;
@@ -280,7 +241,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite, dumpDirectory,
metricCollector);
rcwork.setReadSrcAsFilesList(readSourceAsFileList);
- if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) {
+ if (replicationSpec.isReplace() && deleteDestination) {
rcwork.setDeleteDestIfExist(true);
rcwork.setAutoPurge(isAutoPurge);
rcwork.setNeedRecycle(needRecycle);
@@ -300,35 +261,41 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
return copyTask;
}
-
- public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
- HiveConf conf) {
- return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
- true, false);
- }
-
public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf, String dumpDirectory, ReplicationMetricCollector metricCollector) {
return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
- true, false, dumpDirectory, metricCollector);
+ true, false, true, dumpDirectory, metricCollector);
}
-
/*
* Invoked in the bootstrap path.
* Overwrite set to true
*/
public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
- HiveConf conf, boolean readSourceAsFileList, boolean overWrite) {
+ HiveConf conf, boolean readSourceAsFileList, boolean overWrite,
+ String dumpDirectory, ReplicationMetricCollector metricCollector) {
return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
- readSourceAsFileList, overWrite);
+ readSourceAsFileList, overWrite, true, dumpDirectory, metricCollector);
}
- public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
+ /*
+ * Invoked in the bootstrap dump path. bootstrap dump purge is false.
+ * No purge for dump dir in case of check pointing
+ */
+ public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
HiveConf conf, boolean readSourceAsFileList, boolean overWrite,
- String dumpDirectory, ReplicationMetricCollector metricCollector) {
+ boolean deleteDestination, String dumpDirectory,
+ ReplicationMetricCollector metricCollector) {
+ return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
+ readSourceAsFileList, overWrite, deleteDestination, dumpDirectory, metricCollector);
+ }
+
+
+ public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
+ HiveConf conf, String dumpDirectory,
+ ReplicationMetricCollector metricCollector) {
return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
- readSourceAsFileList, overWrite, dumpDirectory, metricCollector);
+ true, false, true, dumpDirectory, metricCollector);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index ea9bf9a..1fce791 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -163,6 +163,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot);
boolean isBootstrap = (previousValidHiveDumpPath == null);
+ work.setBootstrap(isBootstrap);
//If no previous dump is present or previous dump is already loaded, proceed with the dump operation.
if (shouldDump(previousValidHiveDumpPath)) {
Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
@@ -524,6 +525,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// waiting for the concurrent transactions to finish, we start dumping the incremental events
// and wait only for the remaining time if any.
if (needBootstrapAcidTablesDuringIncrementalDump()) {
+ work.setBootstrap(true);
bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
assert (bootDumpBeginReplId >= 0);
LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}",
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 4da5bac..aecfe75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -51,6 +51,7 @@ public class ReplDumpWork implements Serializable {
final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath;
Long eventTo;
Long eventFrom;
+ private boolean isBootstrap;
private static String testInjectDumpDir = null;
private static boolean testInjectDumpDirAutoIncrement = false;
static boolean testDeletePreviousDumpMetaPath = false;
@@ -134,6 +135,10 @@ public class ReplDumpWork implements Serializable {
}
}
+ void setBootstrap(boolean bootstrap) {
+ isBootstrap = bootstrap;
+ }
+
public void setExternalTblCopyPathIterator(Iterator<String> externalTblCopyPathIterator) {
if (this.externalTblCopyPathIterator != null) {
throw new IllegalStateException("External table copy path iterator has already been initialized");
@@ -204,10 +209,12 @@ public class ReplDumpWork implements Serializable {
replSpec.setInReplicationScope(true);
EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec);
managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next());
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ //If its incremental, in checkpointing case, dump dir may exist. We will delete the event dir.
+ //In case of bootstrap checkpointing we will not delete the entire dir and just do a sync
+ Task<?> copyTask = ReplCopyTask.getDumpCopyTask(
managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(),
- managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite,
- getCurrentDumpPath().toString(), getMetricCollector());
+ managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite, !isBootstrap,
+ getCurrentDumpPath().toString(), getMetricCollector());
tasks.add(copyTask);
tracker.addTask(copyTask);
LOG.debug("added task for {}", managedTableCopyPath);
@@ -220,9 +227,9 @@ public class ReplDumpWork implements Serializable {
if (functionCopyPathIterator != null) {
while (functionCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
EximUtil.DataCopyPath binaryCopyPath = functionCopyPathIterator.next();
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ Task<?> copyTask = ReplCopyTask.getDumpCopyTask(
binaryCopyPath.getReplicationSpec(), binaryCopyPath.getSrcPath(), binaryCopyPath.getTargetPath(), conf,
- getCurrentDumpPath().toString(), getMetricCollector()
+ getCurrentDumpPath().toString(), getMetricCollector()
);
tasks.add(copyTask);
tracker.addTask(copyTask);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 16c906c..e7245bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -371,7 +371,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
LoadPartitions loadPartitions =
new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker,
- event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector());
+ event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector(),
+ event.lastPartSpecReplicated(), event.lastStageReplicated());
/*
the tableTracker here should be a new instance and not an existing one as this can
only happen when we break in between loading partitions.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
index b9d6679..f5ab30c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
@@ -18,9 +18,14 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
public interface PartitionEvent extends TableEvent {
AlterTableAddPartitionDesc lastPartitionReplicated();
+ ReplicationState.PartitionState.Stage lastStageReplicated();
+
+ AlterTableAddPartitionDesc.PartitionDesc lastPartSpecReplicated();
+
TableEvent asTableEvent();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
index 2d82408..c7705e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -52,6 +52,18 @@ public class FSPartitionEvent implements PartitionEvent {
}
@Override
+ public ReplicationState.PartitionState.Stage lastStageReplicated() {
+ assert replicationState != null && replicationState.partitionState != null;
+ return replicationState.partitionState.stage;
+ }
+
+ @Override
+ public AlterTableAddPartitionDesc.PartitionDesc lastPartSpecReplicated() {
+ assert replicationState != null && replicationState.partitionState != null;
+ return replicationState.partitionState.partSpec;
+ }
+
+ @Override
public TableEvent asTableEvent() {
return tableEvent;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
index a67184d..d8c995c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
@@ -20,16 +20,34 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
import java.io.Serializable;
import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
+import org.apache.hadoop.hive.ql.exec.CopyTask;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
public class ReplicationState implements Serializable {
public static class PartitionState {
final String tableName;
public final AlterTableAddPartitionDesc lastReplicatedPartition;
+ public AlterTableAddPartitionDesc.PartitionDesc partSpec;
+ public Stage stage;
+
+ public enum Stage {
+ COPY,
+ PARTITION
+ }
public PartitionState(String tableName, AlterTableAddPartitionDesc lastReplicatedPartition) {
this.tableName = tableName;
this.lastReplicatedPartition = lastReplicatedPartition;
+ this.stage = Stage.PARTITION;
+ }
+
+ public PartitionState(String tableName, AlterTableAddPartitionDesc lastReplicatedPartition,
+ AlterTableAddPartitionDesc.PartitionDesc lastProcessedPartSpec, Stage stage) {
+ this.tableName = tableName;
+ this.lastReplicatedPartition = lastReplicatedPartition;
+ this.partSpec = lastProcessedPartSpec;
+ this.stage = stage;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index e0c9b96..bcbf20c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitio
import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
@@ -38,8 +37,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -50,10 +47,6 @@ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
-import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.datanucleus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +61,6 @@ import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CHECKPOINT_KEY;
import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
@@ -83,6 +75,8 @@ public class LoadPartitions {
private final TableEvent event;
private final TaskTracker tracker;
private final AlterTableAddPartitionDesc lastReplicatedPartition;
+ private final AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc;
+ private final PartitionState.Stage lastReplicatedStage;
private final ReplicationMetricCollector metricCollector;
private final ImportTableDesc tableDesc;
@@ -92,23 +86,26 @@ public class LoadPartitions {
TableEvent event, String dbNameToLoadIn,
TableContext tableContext, ReplicationMetricCollector metricCollector) throws HiveException {
this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null,
- metricCollector);
+ metricCollector, null, PartitionState.Stage.PARTITION);
}
public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
AlterTableAddPartitionDesc lastReplicatedPartition,
- ReplicationMetricCollector metricCollector) throws HiveException {
+ ReplicationMetricCollector metricCollector,
+ AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc,
+ ReplicationState.PartitionState.Stage lastReplicatedStage) throws HiveException {
this.tracker = new TaskTracker(limiter);
this.event = event;
this.context = context;
this.replLogger = replLogger;
this.lastReplicatedPartition = lastReplicatedPartition;
this.tableContext = tableContext;
-
this.tableDesc = event.tableDesc(dbNameToLoadIn);
this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
this.metricCollector = metricCollector;
+ this.lastReplicatedPartitionDesc = lastReplicatedPartitionDesc;
+ this.lastReplicatedStage = lastReplicatedStage;
}
public TaskTracker tasks() throws Exception {
@@ -176,11 +173,29 @@ public class LoadPartitions {
* Also, copy relevant stats and other information from original request.
*
* @throws SemanticException
+ * @param lastAlterTableAddPartitionDesc
*/
- private void addConsolidatedPartitionDesc() throws Exception {
+ private void addConsolidatedPartitionDesc(AlterTableAddPartitionDesc lastAlterTableAddPartitionDesc) throws Exception {
+ int maxTasks = 0;
//Load partitions equal to batch size at one go for metadata only and for external tables.
- int maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE);
+ if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE);
+ } else {
+ maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE);
+ }
int currentPartitionCount = 0;
+ Iterator<AlterTableAddPartitionDesc> partitionIterator = event.partitionDescriptions(tableDesc).iterator();
+ //If already a set of partitions are processed as part of previous run, we skip those
+ if (lastAlterTableAddPartitionDesc != null) {
+ while (partitionIterator.hasNext()) {
+ currentPartitionCount++;
+ AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next();
+ if (lastAlterTableAddPartitionDesc.getPartitions().get(0).getPartSpec()
+ .equals(addPartitionDesc.getPartitions().get(0).getPartSpec())) {
+ break;
+ }
+ }
+ }
List<AlterTableAddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
int totalPartitionCount = partitionDescs.size();
while (currentPartitionCount < totalPartitionCount) {
@@ -207,37 +222,28 @@ public class LoadPartitions {
tableDesc.getTableName(), true, partitions);
//don't need to add ckpt task separately. Added as part of add partition task
- addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, null);
- if (partitions.size() > 0) {
- LOG.info("Added {} partitions", partitions.size());
+ addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc);
+ if (!tracker.canAddMoreTasks()) {
+ //No need to do processing as no more tasks can be added. Will be processed in next run. State is already
+ //updated in add partition task
+ return;
}
currentPartitionCount = toPartitionCount;
}
}
private TaskTracker forNewTable() throws Exception {
- if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- // Place all partitions in single task to reduce load on HMS.
- addConsolidatedPartitionDesc();
- return tracker;
- }
-
- Iterator<AlterTableAddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
- while (iterator.hasNext() && tracker.canAddMoreTasks()) {
- AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
- /*
- the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the
- current retrieved lastReplicatedPartition
- */
- addPartition(iterator.hasNext(), currentPartitionDesc, null);
- }
+ // Place all partitions in single task to reduce load on HMS.
+ addConsolidatedPartitionDesc(null);
return tracker;
}
- private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc)
throws Exception {
- tracker.addTask(tasksForAddPartition(table, addPartitionDesc, ptnRootTask));
- if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+ boolean processingComplete = addTasksForPartition(table, addPartitionDesc, null);
+ //If processing is not complete, means replication state is already updated with copy tasks which need
+ //to be processed
+ if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks()) {
ReplicationState currentReplicationState =
new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc));
updateReplicationState(currentReplicationState);
@@ -245,113 +251,56 @@ public class LoadPartitions {
}
/**
- * returns the root task for adding a partition
+ * returns the root task for adding all partitions in a batch
*/
- private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+ private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc,
+ AlterTableAddPartitionDesc.PartitionDesc lastPartSpec)
throws MetaException, HiveException {
Task<?> addPartTask = TaskFactory.get(
new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc,
true, (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector),
context.hiveConf
);
- //checkpointing task already added as part of add batch of partition in case for metadata only and external tables
+ //checkpointing task already added as part of add batch of partition
if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- if (ptnRootTask == null) {
- ptnRootTask = addPartTask;
- } else {
- ptnRootTask.addDependentTask(addPartTask);
- }
- return ptnRootTask;
+ tracker.addTask(addPartTask);
+ return true;
}
-
- AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0);
- Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
- Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
- partSpec.setLocation(replicaWarehousePartitionLocation.toString());
- LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
- + partSpecToString(partSpec.getPartSpec()) + " with source location: "
- + partSpec.getLocation());
- Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
- tableDesc,
- (HashMap<String, String>)partSpec.getPartSpec(),
- context.dumpDirectory,
- this.metricCollector,
- context.hiveConf
- );
-
- Path stagingDir = replicaWarehousePartitionLocation;
- // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
- LoadFileType loadFileType;
- if (event.replicationSpec().isInReplicationScope() &&
- context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
- loadFileType = LoadFileType.IGNORE;
- } else {
- loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
- stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
- }
- boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ //Add Copy task for all partitions
+ boolean lastProcessedStageFound = false;
+ for (AlterTableAddPartitionDesc.PartitionDesc partSpec : addPartitionDesc.getPartitions()) {
+ if (!tracker.canAddMoreTasks()) {
+ //update replication state with the copy task added with which it needs to proceed next
+ ReplicationState currentReplicationState =
+ new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc,
+ partSpec, PartitionState.Stage.COPY));
+ updateReplicationState(currentReplicationState);
+ return false;
+ }
+ Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
+ partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+ LOG.debug("adding dependent CopyWork for partition "
+ + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+ + partSpec.getLocation());
+ if (!lastProcessedStageFound && lastPartSpec != null &&
+ lastPartSpec.getLocation() != partSpec.getLocation()) {
+ //Don't process copy task if already processed as part of previous run
+ continue;
+ }
+ lastProcessedStageFound = true;
+ boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
+ Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
- new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)),
- stagingDir,
+ new Path(event.dataPath() + Path.SEPARATOR + Warehouse.makePartPath(partSpec.getPartSpec())),
+ replicaWarehousePartitionLocation,
context.hiveConf, copyAtLoad, false, (new Path(context.dumpDirectory)).getParent().toString(),
this.metricCollector
- );
-
- Task<?> movePartitionTask = null;
- if (loadFileType != LoadFileType.IGNORE) {
- // no need to create move task, if file is moved directly to target location.
- movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType);
- }
-
- if (ptnRootTask == null) {
- ptnRootTask = copyTask;
- } else {
- ptnRootTask.addDependentTask(copyTask);
- }
-
- // Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is
- // retried for bootstrap, we skip current partition update.
- copyTask.addDependentTask(addPartTask);
- if (movePartitionTask != null) {
- addPartTask.addDependentTask(movePartitionTask);
- movePartitionTask.addDependentTask(ckptTask);
- } else {
- addPartTask.addDependentTask(ckptTask);
- }
- return ptnRootTask;
- }
-
- private String getPartitionName(Path partitionMetadataFullPath) {
- //Get partition name by removing the metadata base path.
- //Needed for getting the data path
- return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length());
- }
-
- /**
- * This will create the move of partition data from temp path to actual path
- */
- private Task<?> movePartitionTask(Table table, AlterTableAddPartitionDesc.PartitionDesc partSpec, Path tmpPath,
- LoadFileType loadFileType) {
- MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false,
- (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector,
- true);
- if (AcidUtils.isTransactionalTable(table)) {
- LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
- Collections.singletonList(tmpPath),
- Collections.singletonList(new Path(partSpec.getLocation())),
- true, null, null);
- moveWork.setMultiFilesDesc(loadFilesWork);
- } else {
- LoadTableDesc loadTableWork = new LoadTableDesc(
- tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
- loadFileType, 0L
);
- loadTableWork.setInheritTableSpecs(false);
- moveWork.setLoadTableWork(loadTableWork);
+ tracker.addTask(copyTask);
}
- moveWork.setIsInReplicationScope(event.replicationSpec().isInReplicationScope());
- return TaskFactory.get(moveWork, context.hiveConf);
+ //add partition metadata task once all the copy tasks are added
+ tracker.addDependentTask(addPartTask);
+ return true;
}
/**
@@ -415,24 +364,39 @@ public class LoadPartitions {
Map<String, String> currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec);
}
-
+ //Add Copy task pending for previous partition
+ if (PartitionState.Stage.COPY.equals(lastReplicatedStage)) {
+ addTasksForPartition(table, lastPartitionReplicated,
+ lastReplicatedPartitionDesc);
+ }
+ boolean pendingPartitions = false;
while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) {
+ pendingPartitions = true;
AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next();
- Map<String, String> partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
- Task<?> ptnRootTask = null;
+ AlterTableAddPartitionDesc.PartitionDesc src = addPartitionDesc.getPartitions().get(0);
+ //Add check point task as part of add partition
+ Map<String, String> partParams = new HashMap<>();
+ partParams.put(REPL_CHECKPOINT_KEY, context.dumpDirectory);
+ Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, src);
+ src.setLocation(replicaWarehousePartitionLocation.toString());
+ src.addPartParams(partParams);
+ Map<String, String> partSpec = src.getPartSpec();
+
ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec);
switch (loadPtnType) {
case LOAD_NEW:
break;
case LOAD_REPLACE:
- ptnRootTask = dropPartitionTask(table, partSpec);
+ tracker.addDependentTask(dropPartitionTask(table, partSpec));
break;
case LOAD_SKIP:
continue;
default:
break;
}
- addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask);
+ }
+ if (pendingPartitions) {
+ addConsolidatedPartitionDesc(lastPartitionReplicated);
}
return tracker;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 11a1036..fb31159 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -60,7 +60,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.TreeMap;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
public class LoadTable {
@@ -270,10 +269,9 @@ public class LoadTable {
Path dataPath = fromURI;
Path tmpPath = tgtPath;
- // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
+ // if acid tables, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
- if (replicationSpec.isInReplicationScope() &&
- context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
+ if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) {
loadFileType = LoadFileType.IGNORE;
} else {
loadFileType = (replicationSpec.isReplace())
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 5e05c73..0e7209d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -89,7 +89,6 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
/**
* ImportSemanticAnalyzer.
@@ -437,10 +436,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
boolean isSkipTrash = false;
boolean needRecycle = false;
- if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean(
- REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
- lft = LoadFileType.IGNORE;
- destPath = loadPath = tgtPath;
+ if (replicationSpec.isInReplicationScope()) {
isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
if (table.isTemporary()) {
needRecycle = false;
@@ -448,27 +444,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable());
}
+ }
+ if (AcidUtils.isTransactionalTable(table)) {
+ String mmSubdir = replace ? AcidUtils.baseDir(writeId)
+ : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
+ destPath = new Path(tgtPath, mmSubdir);
+ /**
+ * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition
+ * directory, i.e. the final destination for these files. This has to be a copy to preserve
+ * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem.
+ * So setting 'loadPath' this way will make
+ * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean,
+ * boolean, Long, int)}
+ * skip the unnecessary file (rename) operation but it will perform other things.
+ */
+ loadPath = tgtPath;
+ lft = LoadFileType.KEEP_EXISTING;
} else {
- if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) {
- String mmSubdir = replace ? AcidUtils.baseDir(writeId)
- : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
- destPath = new Path(tgtPath, mmSubdir);
- /**
- * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition
- * directory, i.e. the final destination for these files. This has to be a copy to preserve
- * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem.
- * So setting 'loadPath' this way will make
- * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean,
- * boolean, Long, int)}
- * skip the unnecessary file (rename) operation but it will perform other things.
- */
- loadPath = tgtPath;
- lft = LoadFileType.KEEP_EXISTING;
- } else {
- destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
- lft = replace ? LoadFileType.REPLACE_ALL :
- LoadFileType.OVERWRITE_EXISTING;
- }
+ destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
+ lft = replace ? LoadFileType.REPLACE_ALL :
+ LoadFileType.OVERWRITE_EXISTING;
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -649,8 +644,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadFileType loadFileType;
Path destPath;
- if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean(
- REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
+ if (replicationSpec.isInReplicationScope()) {
loadFileType = LoadFileType.IGNORE;
destPath = tgtLocation;
isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index b73921c..4c10499 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
@@ -48,16 +47,12 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
import java.io.IOException;
import java.net.URI;
-import java.nio.charset.StandardCharsets;
import java.util.Map;
-import java.util.Base64;
import java.util.List;
import java.util.Collections;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
@@ -264,29 +259,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private boolean ifEnableMoveOptimization(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception {
- if (filePath == null) {
- throw new HiveException("filePath cannot be null");
- }
-
- URI uri = filePath.toUri();
- String scheme = uri.getScheme();
- scheme = StringUtils.isBlank(scheme) ? FileSystem.get(uri, conf).getScheme() : scheme;
- if (StringUtils.isBlank(scheme)) {
- throw new HiveException("Cannot get valid scheme for " + filePath);
- }
-
- LOG.info("scheme is " + scheme);
-
- String[] schmeList = conf.get(REPL_MOVE_OPTIMIZED_FILE_SCHEMES.varname).toLowerCase().split(",");
- for (String schemeIter : schmeList) {
- if (schemeIter.trim().equalsIgnoreCase(scheme.trim())) {
- return true;
- }
- }
- return false;
- }
-
// REPL LOAD
private void initReplLoad(ASTNode ast) throws HiveException {
sourceDbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
@@ -363,17 +335,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
try {
assert(sourceDbNameOrPattern != null);
Path loadPath = getCurrentLoadPath();
- // Ths config is set to make sure that in case of s3 replication, move is skipped.
- try {
- Warehouse wh = new Warehouse(conf);
- Path filePath = wh.getWhRoot();
- if (ifEnableMoveOptimization(filePath, conf)) {
- conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true);
- LOG.info(" Set move optimization to true for warehouse " + filePath.toString());
- }
- } catch (Exception e) {
- throw new SemanticException(e.getMessage(), e);
- }
// Now, the dumped path can be one of three things:
// a) It can be a db dump, in which case we expect a set of dirs, each with a
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java
index faba6e4..ddf687b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java
@@ -92,7 +92,7 @@ public class TestPrimaryToReplicaResourceFunction {
mockStatic(ReplCopyTask.class);
Task mock = mock(Task.class);
when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class),
- any(HiveConf.class))).thenReturn(mock);
+ any(HiveConf.class), any(), any())).thenReturn(mock);
ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR,
"hdfs://localhost:9000/user/someplace/ab.jar#e094828883"));
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
index 8673186..b828d32 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
@@ -93,6 +93,8 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
private static com.google.common.base.Function<List<Partition>, Boolean> alterPartitionsModifier = null;
+ private static com.google.common.base.Function<List<Partition>, Boolean> addPartitionsModifier = null;
+
// Methods to set/reset getTable modifier
public static void setGetTableBehaviour(com.google.common.base.Function<Table, Table> modifier){
getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier;
@@ -154,10 +156,18 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
setAlterTableModifier(null);
}
+ public static void resetAddPartitionModifier() {
+ setAddPartitionsBehaviour(null);
+ }
+
public static void setAlterPartitionsBehaviour(com.google.common.base.Function<List<Partition>, Boolean> modifier){
alterPartitionsModifier = modifier;
}
+ public static void setAddPartitionsBehaviour(com.google.common.base.Function<List<Partition>, Boolean> modifier){
+ addPartitionsModifier = modifier;
+ }
+
// ObjectStore methods to be overridden with injected behavior
@Override
@@ -317,4 +327,17 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
}
return super.alterPartitions(catName, dbname, name, part_vals, newParts, writeId, queryWriteIdList);
}
+
+ @Override
+ public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
+ throws InvalidObjectException, MetaException {
+ if (addPartitionsModifier != null) {
+ Boolean success = addPartitionsModifier.apply(parts);
+ if ((success != null) && !success) {
+ throw new MetaException("InjectableBehaviourObjectStore: Invalid addPartitions operation on Catalog : "
+ + catName + " DB: " + dbName + " table: " + tblName);
+ }
+ }
+ return super.addPartitions(catName, dbName, tblName, parts);
+ }
}