You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/04/09 11:30:20 UTC
[hive] branch master updated: HIVE-23039: Checkpointing for repl
dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha,
Anishek Agarwal)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 997fb16 HIVE-23039: Checkpointing for repl dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha,Anishek Agarwal)
997fb16 is described below
commit 997fb162eb58d2422d36d6d3171c59b8c40754a7
Author: Aasha Medhi <aa...@gmail.com>
AuthorDate: Thu Apr 9 17:00:05 2020 +0530
HIVE-23039: Checkpointing for repl dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha,Anishek Agarwal)
---
.../hive/ql/parse/TestReplicationScenarios.java | 32 ++-
.../parse/TestReplicationScenariosAcidTables.java | 310 +++++++++++++++++++++
.../TestReplicationScenariosAcrossInstances.java | 11 +-
.../TestReplicationScenariosExternalTables.java | 56 ++--
.../parse/TestTableLevelReplicationScenarios.java | 13 +-
.../org/apache/hadoop/hive/ql/exec/ExportTask.java | 2 +-
.../hadoop/hive/ql/exec/repl/DirCopyWork.java | 1 +
.../apache/hadoop/hive/ql/exec/repl/ReplAck.java | 35 +++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 120 +++++---
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 4 +-
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 7 +-
.../ql/exec/repl/bootstrap/events/TableEvent.java | 2 +
.../events/filesystem/BootstrapEventsIterator.java | 6 +-
.../events/filesystem/DatabaseEventsIterator.java | 22 +-
.../events/filesystem/FSPartitionEvent.java | 9 +-
.../bootstrap/events/filesystem/FSTableEvent.java | 20 +-
.../repl/bootstrap/load/table/LoadPartitions.java | 10 +-
.../exec/repl/bootstrap/load/table/LoadTable.java | 5 +-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 10 +-
.../org/apache/hadoop/hive/ql/parse/EximUtil.java | 1 +
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 7 +-
.../hive/ql/parse/repl/dump/PartitionExport.java | 6 +-
.../hive/ql/parse/repl/dump/TableExport.java | 98 +++++--
.../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 11 +-
24 files changed, 660 insertions(+), 138 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index efe9fff..c79d4c3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -98,6 +98,8 @@ import java.util.Map;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -315,8 +317,8 @@ public class TestReplicationScenarios {
FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
- assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
- assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)));
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror);
verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror);
@@ -367,8 +369,8 @@ public class TestReplicationScenarios {
advanceDumpDir();
FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
- assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
- assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)));
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptnData, driverMirror);
verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
@@ -452,7 +454,7 @@ public class TestReplicationScenarios {
Path loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
//delete load ack to reload the same dump
- loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+ loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true);
loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver);
@@ -466,7 +468,7 @@ public class TestReplicationScenarios {
loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
//delete load ack to reload the same dump
- loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+ loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true);
loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver);
@@ -902,8 +904,8 @@ public class TestReplicationScenarios {
Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName);
FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
- assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
- assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)));
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
// VERIFY tables and partitions on destination for equivalence.
verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror);
@@ -1439,8 +1441,8 @@ public class TestReplicationScenarios {
Path path = new Path(System.getProperty("test.warehouse.dir", ""));
String tableRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "unptned";
Path srcFileLocation = new Path(path, tableRelativeSrcPath + File.separator + unptnedFileName1);
- String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator
- + "unptned" + File.separator + EximUtil.DATA_PATH_NAME +File.separator + unptnedFileName1;
+ String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + EximUtil.DATA_PATH_NAME
+ + File.separator + dbName.toLowerCase() + File.separator + "unptned" +File.separator + unptnedFileName1;
Path tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath);
//A file in table at src location should be copied to $dumplocation/hive/<db>/<table>/data/<unptned_fileName>
verifyChecksum(srcFileLocation, tgtFileLocation, true);
@@ -1449,9 +1451,10 @@ public class TestReplicationScenarios {
String partitionRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "ptned" + File.separator + "b=1";
srcFileLocation = new Path(path, partitionRelativeSrcPath + File.separator + ptnedFileName1);
- tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase()
+ tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + EximUtil.DATA_PATH_NAME
+ + File.separator + dbName.toLowerCase()
+ File.separator + "ptned" + File.separator + "b=1" + File.separator
- + EximUtil.DATA_PATH_NAME +File.separator + ptnedFileName1;
+ + ptnedFileName1;
tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath);
//A partitioned file in table at src location should be copied to
// $dumplocation/hive/<db>/<table>/<partition>/data/<unptned_fileName>
@@ -1723,7 +1726,8 @@ public class TestReplicationScenarios {
Tuple incrementalDump = replDumpDb(dbName);
//Remove the dump ack file, so that dump is treated as an invalid dump.
- String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT;
+ String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator
+ + DUMP_ACKNOWLEDGEMENT.toString();
Path dumpFinishedAckFilePath = new Path(incrementalDump.dumpLocation, ackFileRelativePath);
Path tmpDumpFinishedAckFilePath = new Path(dumpFinishedAckFilePath.getParent(),
"old_" + dumpFinishedAckFilePath.getName());
@@ -1809,7 +1813,7 @@ public class TestReplicationScenarios {
FileSystem fs = FileSystem.get(fileToDelete.toUri(), hconf);
fs.delete(fileToDelete, true);
assertTrue(fs.exists(bootstrapDumpDir));
- assertTrue(fs.exists(new Path(bootstrapDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
+ assertTrue(fs.exists(new Path(bootstrapDumpDir, DUMP_ACKNOWLEDGEMENT.toString())));
loadAndVerify(replDbName, dbName, incrDump.lastReplId);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 1e25598..39722a0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.parse;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -32,10 +33,12 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.Behaviour
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -51,7 +54,11 @@ import java.util.Collections;
import java.util.Map;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables.
@@ -655,4 +662,307 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertEquals("REPL LOAD * is not supported", e.getMessage());
}
}
+
+ @Test
+ public void testCheckPointingDataDumpFailure() throws Throwable {
+ //To force distcp copy
+ List<String> dumpClause = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+ "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+ "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+ .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (2)")
+ .run("insert into t1 values (3)")
+ .run("insert into t2 values (11)")
+ .run("insert into t2 values (21)")
+ .dump(primaryDbName);
+
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME);
+ long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime();
+ Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+ Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase());
+ Path tablet1Path = new Path(dbDataPath, "t1");
+ Path tablet2Path = new Path(dbDataPath, "t2");
+ //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2
+ fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+ assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ FileStatus[] statuses = fs.listStatus(tablet2Path);
+ //Delete t2 data
+ fs.delete(statuses[0].getPath(), true);
+ long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime();
+ long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+ long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+ //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater
+ WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause);
+ assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime());
+ assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
+ assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+ assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime());
+ replica.load(replicatedDbName, primaryDbName)
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[] {"1", "2", "3"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"11", "21"});
+ }
+
+ @Test
+ public void testCheckPointingDataDumpFailureRegularCopy() throws Throwable {
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+ .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (2)")
+ .run("insert into t1 values (3)")
+ .run("insert into t2 values (11)")
+ .run("insert into t2 values (21)")
+ .dump(primaryDbName);
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME);
+ long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime();
+ Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+ Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+ Path tablet1Path = new Path(dbPath, "t1");
+ Path tablet2Path = new Path(dbPath, "t2");
+ //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2
+ fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+ assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ FileStatus[] statuses = fs.listStatus(tablet2Path);
+ //Delete t2 data
+ fs.delete(statuses[0].getPath(), true);
+ long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime();
+ long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+ long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+ //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater
+ WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName);
+ assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ //File is copied again as we are using regular copy
+ assertTrue(modifiedTimeTable1 < fs.getFileStatus(tablet1Path).getModificationTime());
+ assertTrue(modifiedTimeTable1CopyFile < fs.listStatus(tablet1Path)[0].getModificationTime());
+ assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+ assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime());
+ replica.load(replicatedDbName, primaryDbName)
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[] {"1", "2", "3"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"11", "21"});
+ }
+
+ @Test
+ public void testCheckPointingWithSourceTableDataInserted() throws Throwable {
+ //To force distcp copy
+ List<String> dumpClause = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+ "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+ "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+ .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (2)")
+ .run("insert into t1 values (3)")
+ .run("insert into t2 values (11)")
+ .run("insert into t2 values (21)")
+ .dump(primaryDbName);
+
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+ Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+ Path tablet1Path = new Path(dbPath, "t1");
+ Path tablet2Path = new Path(dbPath, "t2");
+ long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+ //Delete table 2 data
+ FileStatus[] statuses = fs.listStatus(tablet2Path);
+ //Delete t2 data
+ fs.delete(statuses[0].getPath(), true);
+ fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+ long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+
+ //Do another dump. It should only dump table t2. Also insert new data in existing tables.
+ // New data should be there in target
+ primary.run("use " + primaryDbName)
+ .run("insert into t2 values (13)")
+ .run("insert into t2 values (24)")
+ .run("insert into t1 values (4)")
+ .dump(primaryDbName, dumpClause);
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select * from t1")
+ .verifyResults(new String[]{"1", "2", "3", "4"})
+ .run("select * from t2")
+ .verifyResults(new String[]{"11", "21", "13", "24"});
+ assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
+ assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+ }
+
+ @Test
+ public void testCheckPointingWithNewTablesAdded() throws Throwable {
+ //To force distcp copy
+ List<String> dumpClause = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+ "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+ "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+ .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (2)")
+ .run("insert into t1 values (3)")
+ .run("insert into t2 values (11)")
+ .run("insert into t2 values (21)")
+ .dump(primaryDbName);
+
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+ Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+ Path tablet1Path = new Path(dbPath, "t1");
+ Path tablet2Path = new Path(dbPath, "t2");
+ long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime();
+ long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+ //Delete table 2 data
+ FileStatus[] statuses = fs.listStatus(tablet2Path);
+ fs.delete(statuses[0].getPath(), true);
+ fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+ long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+
+ // Do another dump. It should only dump table t2 and next table.
+ // Also insert new tables. New tables will be there in target
+ primary.run("use " + primaryDbName)
+ .run("insert into t2 values (13)")
+ .run("insert into t2 values (24)")
+ .run("create table t3(a string) STORED AS TEXTFILE")
+ .run("insert into t3 values (1)")
+ .run("insert into t3 values (2)")
+ .run("insert into t3 values (3)")
+ .dump(primaryDbName, dumpClause);
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select * from t1")
+ .verifyResults(new String[]{"1", "2", "3"})
+ .run("select * from t2")
+ .verifyResults(new String[]{"11", "21", "13", "24"})
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2", "t3"})
+ .run("select * from t3")
+ .verifyResults(new String[]{"1", "2", "3"});
+ assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime());
+ assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
+ assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+ }
+
+ @Test
+ public void testCheckPointingWithSourceTableDeleted() throws Throwable {
+ //To force distcp copy
+ List<String> dumpClause = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+ "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+ "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+ .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (2)")
+ .run("insert into t1 values (3)")
+ .run("insert into t2 values (11)")
+ .run("insert into t2 values (21)")
+ .dump(primaryDbName);
+
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+
+
+ //Delete dump ack and t2 data, Also drop table. New data will be there in target
+ fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+ assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+ Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+ Path tablet2Path = new Path(dbPath, "t2");
+ FileStatus[] statuses = fs.listStatus(tablet2Path);
+ //Delete t2 data.
+ fs.delete(statuses[0].getPath(), true);
+ //Drop table t1. Target shouldn't have t1 table as metadata dump is rewritten
+ primary.run("use " + primaryDbName)
+ .run("drop table t1")
+ .dump(primaryDbName, dumpClause);
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t2"})
+ .run("select * from t2")
+ .verifyResults(new String[]{"11", "21"});
+ }
+
+ @Test
+ public void testCheckPointingMetadataDumpFailure() throws Throwable {
+ //To force distcp copy
+ List<String> dumpClause = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+ "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+ "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+ .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (2)")
+ .run("insert into t1 values (3)")
+ .run("insert into t2 values (11)")
+ .run("insert into t2 values (21)")
+ .dump(primaryDbName);
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+
+ //Delete dump ack and metadata ack, everything should be rewritten in a new dump dir
+ fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+ fs.delete(new Path(dumpPath, "_dumpmetadata"), true);
+ assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ //Insert new data
+ primary.run("insert into "+ primaryDbName +".t1 values (12)");
+ primary.run("insert into "+ primaryDbName +".t1 values (13)");
+ //Do another dump. It should dump everything in a new dump dir
+ // checkpointing will not be used
+ WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause);
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select * from t2")
+ .verifyResults(new String[]{"11", "21"})
+ .run("select * from t1")
+ .verifyResults(new String[]{"1", "2", "3", "12", "13"});
+ assertNotEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation);
+ dumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ }
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 56b27a5..33124c8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -55,6 +55,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -1093,13 +1094,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// To retry with same dump delete the load ack
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(
- hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+ hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true);
// Retry with same dump with which it was already loaded also fails.
replica.loadFailure(replicatedDbName, primaryDbName);
// To retry with same dump delete the load ack
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(
- hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+ hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true);
// Retry from same dump when the database is empty is also not allowed.
replica.run("drop table t1")
.run("drop table t2")
@@ -1344,7 +1345,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
//delete load ack to reuse the dump
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
- + ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+ + LOAD_ACKNOWLEDGEMENT.toString()), true);
replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
@@ -1370,7 +1371,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
replica.load(replicatedDbName, primaryDbName, withConfigs);
//delete load ack to reuse the dump
new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation
- + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+ + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true);
replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
@@ -1423,7 +1424,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
//delete load ack to reuse the dump
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
- + ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+ + LOAD_ACKNOWLEDGEMENT.toString()), true);
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 1325789..b7a9888 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -106,8 +106,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.dump(primaryDbName, dumpWithClause);
// the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+ Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR),
+ EximUtil.METADATA_PATH_NAME);
assertFalse(primary.miniDFSCluster.getFileSystem()
- .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName))));
+ .exists(new Path(metadataPath + relativeExtInfoPath(primaryDbName))));
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("repl status " + replicatedDbName)
@@ -126,8 +128,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.dump(primaryDbName, dumpWithClause);
// the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+ metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR),
+ EximUtil.METADATA_PATH_NAME);
assertFalse(primary.miniDFSCluster.getFileSystem()
- .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null))));
+ .exists(new Path(metadataPath + relativeExtInfoPath(null))));
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
@@ -152,7 +156,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.dump(primaryDbName, withClauseOptions);
// verify that the external table info is written correctly for bootstrap
- assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName);
+ assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false);
@@ -184,7 +188,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.dump(primaryDbName, withClauseOptions);
// verify that the external table info is written correctly for incremental
- assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation);
+ assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true);
replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
@@ -202,7 +206,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.dumpWithCommand("repl dump " + primaryDbName);
// verify that the external table info is written correctly for incremental
- assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation);
+ assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true);
}
/**
@@ -310,7 +314,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("insert into t2 partition(country='india') values ('bangalore')")
.dump(primaryDbName, withClause);
- assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName);
+ assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName, false);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
@@ -333,7 +337,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("insert into t2 partition(country='australia') values ('sydney')")
.dump(primaryDbName, withClause);
- assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation);
+ assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, true);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
@@ -420,7 +424,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("alter table t1 add partition(country='us')")
.dump(primaryDbName, withClause);
- assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation);
+ assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true);
// Add new data externally, to a partition, but under the partition level top directory
// Also, it is added after dumping the events so data should not be seen at target after REPL LOAD.
@@ -467,7 +471,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// Repl load with zero events but external tables location info should present.
tuple = primary.dump(primaryDbName, withClause);
- assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation);
+ assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true);
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
@@ -519,8 +523,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.dump(primaryDbName, dumpWithClause);
// the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+ Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR),
+ EximUtil.METADATA_PATH_NAME);
assertFalse(primary.miniDFSCluster.getFileSystem()
- .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName))));
+ .exists(new Path(metadataPath + relativeExtInfoPath(null))));
replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
@@ -543,11 +549,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.dump(primaryDbName, dumpWithClause);
// the _external_tables_file info should be created as external tables are to be replicated.
+ Path hivePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertTrue(primary.miniDFSCluster.getFileSystem()
- .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null))));
+ .exists(new Path(hivePath + relativeExtInfoPath(null))));
// verify that the external table info is written correctly for incremental
- assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation);
+ assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation, true);
// _bootstrap directory should be created as bootstrap enabled on external tables.
String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
@@ -762,7 +769,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
}
// Only table t2 should exist in the data location list file.
- assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation);
+ assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation, true);
// The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have
// inserted data.
@@ -917,20 +924,29 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
return ReplicationTestUtils.externalTableWithClause(extTblBaseDir, bootstrapExtTbl, includeExtTbl);
}
- private void assertExternalFileInfo(List<String> expected, String dumplocation) throws IOException {
- assertExternalFileInfo(expected, dumplocation, null);
+ private void assertExternalFileInfo(List<String> expected, String dumplocation,
+ boolean isIncremental) throws IOException {
+ assertExternalFileInfo(expected, dumplocation, null, isIncremental);
}
- private void assertExternalFileInfo(List<String> expected, String dumplocation, String dbName)
+ private void assertExternalFileInfo(List<String> expected, String dumplocation, String dbName,
+ boolean isIncremental)
throws IOException {
- Path externalTableInfoFile = new Path(dumplocation, relativeExtInfoPath(dbName));
+ Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
+ Path externalTableInfoFile;
+ if (isIncremental) {
+ externalTableInfoFile = new Path(hivePath + relativeExtInfoPath(dbName));
+ } else {
+ externalTableInfoFile = new Path(metadataPath + relativeExtInfoPath(dbName));
+ }
ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile);
}
- private String relativeExtInfoPath(String dbName) {
+ private String relativeExtInfoPath(String dbName) {
if (dbName == null) {
- return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + FILE_NAME;
+ return File.separator + FILE_NAME;
} else {
- return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator + FILE_NAME;
+ return File.separator + dbName.toLowerCase() + File.separator + FILE_NAME;
}
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 78251f2..93e24ef 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
@@ -242,7 +243,12 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
// Check if the DB dump path have any tables other than the ones listed in bootstrappedTables.
Path dbPath = new Path(dumpPath, primaryDbName);
- FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath);
+ FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME);
+ }
+ });
Assert.assertEquals(fileStatuses.length, bootstrappedTables.length);
// Eg: _bootstrap/<db_name>/t2, _bootstrap/<db_name>/t3 etc
@@ -500,13 +506,14 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
.dump(replPolicy, dumpWithClause);
String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+ Path metaDataPath = new Path(hiveDumpDir, EximUtil.METADATA_PATH_NAME);
// the _external_tables_file info should be created as external tables are to be replicated.
Assert.assertTrue(primary.miniDFSCluster.getFileSystem()
- .exists(new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME)));
+ .exists(new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME)));
// Verify that the external table info contains only table "a2".
ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"),
- new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME));
+ new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME));
replica.load(replicatedDbName, replPolicy, loadWithClause)
.run("use " + replicatedDbName)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
index 56f0c93..d3e9413 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
@@ -46,7 +46,7 @@ public class ExportTask extends Task<ExportWork> implements Serializable {
TableExport.Paths exportPaths = new TableExport.Paths(
work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false);
Hive db = getHive();
- LOG.debug("Exporting data to: {}", exportPaths.exportRootDir());
+ LOG.debug("Exporting data to: {}", exportPaths.metadataExportRootDir());
work.acidPostProcess(db);
TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(),
work.getReplicationSpec(), db, null, conf, work.getMmContext());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
index efef052..46f9bb3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
Explain.Level.DEFAULT,
Explain.Level.EXTENDED })
public class DirCopyWork implements Serializable {
+ private static final long serialVersionUID = 1L;
private final Path fullyQualifiedSourcePath;
private final Path fullyQualifiedTargetPath;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java
new file mode 100644
index 0000000..db8db5f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+/**
+ * ReplAck, used for repl acknowledgement constants.
+ */
+public enum ReplAck {
+ DUMP_ACKNOWLEDGEMENT("_finished_dump"),
+ LOAD_ACKNOWLEDGEMENT("_finished_load");
+ private String ack;
+ ReplAck(String ack) {
+ this.ack = ack;
+ }
+
+ @Override
+ public String toString() {
+ return ack;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 69f6ffe..2e0af02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -93,6 +93,7 @@ import java.util.UUID;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -135,20 +136,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
.getBytes(StandardCharsets.UTF_8.name())));
- Path previousHiveDumpPath = getPreviousDumpMetadataPath(dumpRoot);
+ Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot);
//If no previous dump is present or previous dump is already loaded, proceed with the dump operation.
- if (shouldDump(previousHiveDumpPath)) {
- Path currentDumpPath = new Path(dumpRoot, getNextDumpDir());
+ if (shouldDump(previousValidHiveDumpPath)) {
+ Path currentDumpPath = getCurrentDumpPath(dumpRoot);
Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf);
// Initialize ReplChangeManager instance since we will require it to encode file URI.
ReplChangeManager.getInstance(conf);
Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
Long lastReplId;
- if (previousHiveDumpPath == null) {
+ if (previousValidHiveDumpPath == null) {
lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
} else {
- work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath));
+ work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath));
lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
}
work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
@@ -166,6 +167,16 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return 0;
}
+ private Path getCurrentDumpPath(Path dumpRoot) throws IOException {
+ Path previousDumpPath = getPreviousDumpPath(dumpRoot);
+ if (previousDumpPath != null && !validDump(previousDumpPath) && shouldResumePreviousDump(previousDumpPath)) {
+ //Resume previous dump
+ return previousDumpPath;
+ } else {
+ return new Path(dumpRoot, getNextDumpDir());
+ }
+ }
+
private void initiateDataCopyTasks() throws SemanticException, IOException {
TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
List<Task<?>> childTasks = new ArrayList<>();
@@ -183,7 +194,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private void finishRemainingTasks() throws SemanticException, IOException {
prepareReturnValues(work.getResultValues());
Path dumpAckFile = new Path(work.getCurrentDumpPath(),
- ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT);
+ ReplUtils.REPL_HIVE_BASE_DIR + File.separator
+ + ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
Utils.create(dumpAckFile, conf);
deleteAllPreviousDumpMeta(work.getCurrentDumpPath());
}
@@ -233,7 +245,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return 0L;
}
- private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException {
+ private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException {
FileStatus latestValidStatus = null;
FileSystem fs = dumpRoot.getFileSystem(conf);
if (fs.exists(dumpRoot)) {
@@ -241,8 +253,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
for (FileStatus status : statuses) {
LOG.info("Evaluating previous dump dir path:{}", status.getPath());
if (latestValidStatus == null) {
- latestValidStatus = validDump(fs, status.getPath()) ? status : null;
- } else if (validDump(fs, status.getPath())
+ latestValidStatus = validDump(status.getPath()) ? status : null;
+ } else if (validDump(status.getPath())
&& status.getModificationTime() > latestValidStatus.getModificationTime()) {
latestValidStatus = status;
}
@@ -254,10 +266,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return latestDumpDir;
}
- private boolean validDump(FileSystem fs, Path dumpDir) throws IOException {
+ private boolean validDump(Path dumpDir) throws IOException {
//Check if it was a successful dump
- Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR);
- return fs.exists(new Path(hiveDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT));
+ if (dumpDir != null) {
+ FileSystem fs = dumpDir.getFileSystem(conf);
+ Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR);
+ return fs.exists(new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()));
+ }
+ return false;
}
private boolean shouldDump(Path previousDumpPath) throws IOException {
@@ -267,7 +283,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return true;
} else {
FileSystem fs = previousDumpPath.getFileSystem(conf);
- return fs.exists(new Path(previousDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT));
+ return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString()));
}
}
@@ -471,8 +487,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// Dump the table to be bootstrapped if required.
if (shouldBootstrapDumpTable(table)) {
HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
+ Path dbDataRoot = new Path(dbRoot, EximUtil.DATA_PATH_NAME);
managedTableCopyPaths.addAll(
- dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId,
+ dumpTable(dbName, tableName, validTxnList,
+ dbRoot, dbDataRoot, bootDumpBeginReplId,
hiveDb, tableTuple));
}
if (tableList != null && isTableSatifiesConfig(table)) {
@@ -611,16 +629,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
List<String> tableList;
LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern);
+ List<DirCopyWork> extTableCopyWorks = new ArrayList<>();
+ List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
long timeoutInMs = HiveConf.getTimeVar(conf,
HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
-
String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
- List<DirCopyWork> extTableCopyWorks = new ArrayList<>();
- List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
+ Path metadataPath = new Path(dumpRoot, EximUtil.METADATA_PATH_NAME);
+ if (shouldResumePreviousDump(dmd)) {
+ //clear the metadata. We need to rewrite the metadata as the write id list will be changed
+ //We can't reuse the previous write id as it might be invalid due to compaction
+ metadataPath.getFileSystem(conf).delete(metadataPath, true);
+ }
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("Dumping db: " + dbName);
-
// TODO : Currently we don't support separate table list for each database.
tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
Database db = hiveDb.getDatabase(dbName);
@@ -634,8 +656,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
Utils.getAllTables(hiveDb, dbName, work.replScope).size(),
hiveDb.getAllFunctions().size());
replLogger.startLog();
- Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb);
- dumpFunctionMetadata(dbName, dumpRoot, hiveDb);
+ Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb);
+ Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName);
+ dumpFunctionMetadata(dbName, dbRoot, hiveDb);
String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
Exception caught = null;
@@ -653,7 +676,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
LOG.debug("Adding table {} to external tables list", tblName);
extTableLocations.addAll(writer.dataLocationDump(tableTuple.object));
}
- managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId,
+ managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot,
+ bootDumpBeginReplId,
hiveDb, tableTuple));
} catch (InvalidTableException te) {
// Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
@@ -677,11 +701,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
throw e;
} else {
LOG.error("failed to reset the db state for " + uniqueKey
- + " on failure of repl dump", e);
+ + " on failure of repl dump", e);
throw caught;
}
}
- if(caught != null) {
+ if (caught != null) {
throw caught;
}
}
@@ -689,21 +713,36 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
Long bootDumpEndReplId = currentNotificationId(hiveDb);
LOG.info("Preparing to return {},{}->{}",
- dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+ dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
dmd.write();
+
work.setDirCopyIterator(extTableCopyWorks.iterator());
work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
return bootDumpBeginReplId;
}
+ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
+ try {
+ return dumpMetaData.getEventFrom() != null;
+ } catch (Exception e) {
+ LOG.info("No previous dump present");
+ return false;
+ }
+ }
+
+ private boolean shouldResumePreviousDump(Path dumpPath) {
+ Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
+ return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf));
+ }
+
long currentNotificationId(Hive hiveDb) throws TException {
return hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
}
- Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception {
- Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, false);
+ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception {
// TODO : instantiating FS objects are generally costly. Refactor
+ Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false);
FileSystem fs = dbRoot.getFileSystem(conf);
Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
HiveWrapper.Tuple<Database> database = new HiveWrapper(hiveDb, dbName, lastReplId).database();
@@ -711,12 +750,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return dbRoot;
}
- List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot,
- long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception {
+ List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata,
+ Path dbRootData, long lastReplId, Hive hiveDb,
+ HiveWrapper.Tuple<Table> tuple) throws Exception {
LOG.info("Bootstrap Dump for table " + tblName);
TableSpec tableSpec = new TableSpec(tuple.object);
TableExport.Paths exportPaths =
- new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
+ new TableExport.Paths(work.astRepresentationForErrorMsg, dbRootMetadata, dbRootData, tblName, conf, true);
String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false
if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
@@ -827,8 +867,26 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
- void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception {
- Path functionsRoot = new Path(new Path(dumpRoot, dbName), ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+ private Path getPreviousDumpPath(Path dumpRoot) throws IOException {
+ FileSystem fs = dumpRoot.getFileSystem(conf);
+ if (fs.exists(dumpRoot)) {
+ FileStatus[] statuses = fs.listStatus(dumpRoot);
+ if (statuses.length > 0) {
+ FileStatus latestValidStatus = statuses[0];
+ for (FileStatus status : statuses) {
+ LOG.info("Evaluating previous dump dir path:{}", status.getPath());
+ if (status.getModificationTime() > latestValidStatus.getModificationTime()) {
+ latestValidStatus = status;
+ }
+ }
+ return latestValidStatus.getPath();
+ }
+ }
+ return null;
+ }
+
+ void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throws Exception {
+ Path functionsRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
List<String> functionNames = hiveDb.getFunctions(dbName, "*");
for (String functionName : functionNames) {
HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 3427b59..a593555 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -65,6 +64,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
private final static int ZERO_TASKS = 0;
@@ -316,7 +316,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
|| (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
//All repl load tasks are executed and status is 0, create the task to add the acknowledgement
AckWork replLoadAckWork = new AckWork(
- new Path(work.dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT));
+ new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf);
if (this.childTasks.isEmpty()) {
this.childTasks.add(loadAckWorkTask);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 474d8c2..56efa32 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.Constrain
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -91,8 +92,10 @@ public class ReplLoadWork implements Serializable {
this.constraintsIterator = null;
}
} else {
- this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, true, hiveConf);
- this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
+ this.bootstrapIterator = new BootstrapEventsIterator(new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME)
+ .toString(), dbNameToLoadIn, true, hiveConf);
+ this.constraintsIterator = new ConstraintEventsIterator(
+ new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME).toString(), hiveConf);
incrementalLoadTasksBuilder = null;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
index 10732b0..05ef274 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
@@ -42,4 +42,6 @@ public interface TableEvent extends BootstrapEvent {
* Exposing the FileSystem implementation outside which is what it should NOT do.
*/
Path metadataPath();
+
+ Path dataPath();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 1af6a4c..5bbe20c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
@@ -82,8 +81,11 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
throws IOException {
Path path = new Path(dumpDirectory);
FileSystem fileSystem = path.getFileSystem(hiveConf);
+ if (!fileSystem.exists(path)) {
+ throw new IllegalArgumentException("No data to load in path " + dumpDirectory);
+ }
FileStatus[] fileStatuses =
- fileSystem.listStatus(new Path(dumpDirectory), ReplUtils.getBootstrapDirectoryFilter(fileSystem));
+ fileSystem.listStatus(path, ReplUtils.getBootstrapDirectoryFilter(fileSystem));
if ((fileStatuses == null) || (fileStatuses.length == 0)) {
throw new IllegalArgumentException("No data to load in path " + dumpDirectory);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index 72baee6..a311f7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -57,7 +57,7 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
this.hiveConf = hiveConf;
FileSystem fileSystem = dbLevelPath.getFileSystem(hiveConf);
// this is only there for the use case where we are doing table only replication and not database level
- if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) {
+ if (!fileSystem.exists(new Path(dbLevelPath, EximUtil.METADATA_NAME))) {
databaseEventProcessed = true;
}
@@ -129,7 +129,8 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
continue;
}
if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) {
- String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), "");
+ String replacedString = next.getPath().toString()
+ .replace(dbLevelPath.toString(), "");
List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR))
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList());
@@ -174,7 +175,15 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
LOG.debug("functions directory: {}", next.toString());
return postProcessing(new FSFunctionEvent(next));
}
- return postProcessing(new FSTableEvent(hiveConf, next.toString()));
+ return postProcessing(new FSTableEvent(hiveConf, next.toString(),
+ new Path(getDbLevelDataPath(), next.getName()).toString()));
+ }
+
+ private Path getDbLevelDataPath() {
+ if (dbLevelPath.toString().contains(Path.SEPARATOR + ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME + Path.SEPARATOR)) {
+ return new Path(dbLevelPath, EximUtil.DATA_PATH_NAME);
+ }
+ return new Path(new Path(dbLevelPath.getParent().getParent(), EximUtil.DATA_PATH_NAME), dbLevelPath.getName());
}
private BootstrapEvent postProcessing(BootstrapEvent bootstrapEvent) {
@@ -187,11 +196,14 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
private BootstrapEvent eventForReplicationState() {
if (replicationState.partitionState != null) {
BootstrapEvent
- bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), replicationState);
+ bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(),
+ new Path(getDbLevelDataPath(), previous.getName()).toString(),
+ replicationState);
replicationState = null;
return bootstrapEvent;
} else if (replicationState.lastTableReplicated != null) {
- FSTableEvent event = new FSTableEvent(hiveConf, previous.toString());
+ FSTableEvent event = new FSTableEvent(hiveConf, previous.toString(),
+ new Path(new Path(dbLevelPath, EximUtil.DATA_PATH_NAME), previous.getName()).toString());
replicationState = null;
return event;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
index a79f5b7..2d82408 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -34,9 +34,9 @@ public class FSPartitionEvent implements PartitionEvent {
private final ReplicationState replicationState;
private final TableEvent tableEvent;
- FSPartitionEvent(HiveConf hiveConf, String metadataDir,
+ FSPartitionEvent(HiveConf hiveConf, String metadataDir, String dataDir,
ReplicationState replicationState) {
- tableEvent = new FSTableEvent(hiveConf, metadataDir);
+ tableEvent = new FSTableEvent(hiveConf, metadataDir, dataDir);
this.replicationState = replicationState;
}
@@ -87,4 +87,9 @@ public class FSPartitionEvent implements PartitionEvent {
public Path metadataPath() {
return tableEvent.metadataPath();
}
+
+ @Override
+ public Path dataPath() {
+ return tableEvent.dataPath();
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 6d38c03..cd3d619 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -52,16 +52,19 @@ import java.util.Map;
import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater;
public class FSTableEvent implements TableEvent {
- private final Path fromPath;
+ private final Path fromPathMetadata;
+ private final Path fromPathData;
private final MetaData metadata;
private final HiveConf hiveConf;
- FSTableEvent(HiveConf hiveConf, String metadataDir) {
+ FSTableEvent(HiveConf hiveConf, String metadataDir, String dataDir) {
try {
URI fromURI = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(metadataDir));
- fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+ fromPathMetadata = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+ URI fromURIData = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(dataDir));
+ fromPathData = new Path(fromURIData.getScheme(), fromURIData.getAuthority(), fromURIData.getPath());
FileSystem fs = FileSystem.get(fromURI, hiveConf);
- metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+ metadata = EximUtil.readMetaData(fs, new Path(fromPathMetadata, EximUtil.METADATA_NAME));
this.hiveConf = hiveConf;
} catch (Exception e) {
throw new RuntimeException(e);
@@ -82,7 +85,12 @@ public class FSTableEvent implements TableEvent {
@Override
public Path metadataPath() {
- return fromPath;
+ return fromPathMetadata;
+ }
+
+ @Override
+ public Path dataPath() {
+ return fromPathData;
}
/**
@@ -150,7 +158,7 @@ public class FSTableEvent implements TableEvent {
//TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose.
for (Partition partition : metadata.getPartitions()) {
// TODO: this should ideally not create AddPartitionDesc per partition
- AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPath, tblDesc, partition);
+ AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPathMetadata, tblDesc, partition);
descs.add(partsDesc);
}
return descs;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 05a590a..b98f1f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.HiveTableName;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -57,7 +56,6 @@ import org.datanucleus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -243,7 +241,7 @@ public class LoadPartitions {
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
- new Path(sourceWarehousePartitionLocation, EximUtil.DATA_PATH_NAME),
+ new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)),
stagingDir,
context.hiveConf, false
);
@@ -272,6 +270,12 @@ public class LoadPartitions {
return ptnRootTask;
}
+ private String getPartitionName(Path partitionMetadataFullPath) {
+ //Get partition name by removing the metadata base path.
+ //Needed for getting the data path
+ return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length());
+ }
+
/**
* This will create the move of partition data from temp path to actual path
*/
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 82a3031..bb20687 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -223,7 +222,7 @@ public class LoadTable {
if (shouldCreateLoadTableTask) {
LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
Task<?> loadTableTask = loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()),
- event.metadataPath());
+ event.dataPath());
parentTask.addDependentTask(loadTableTask);
}
tracker.addTask(tblRootTask);
@@ -272,7 +271,7 @@ public class LoadTable {
private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath,
Path fromURI) {
- Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME);
+ Path dataPath = fromURI;
Path tmpPath = tgtPath;
// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 211c3f0..939cbc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -94,10 +94,6 @@ public class ReplUtils {
// Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be
// seen in production or in case of tests other than the ones where it's required.
public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables";
- //Acknowledgement for repl dump complete
- public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump";
- //Acknowledgement for repl load complete
- public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load";
/**
* Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
*/
@@ -240,7 +236,8 @@ public class ReplUtils {
try {
return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME)
&& !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME)
- && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME);
+ && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME)
+ && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -251,7 +248,8 @@ public class ReplUtils {
return p -> {
try {
return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME)
- && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME);
+ && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME)
+ && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index bc90ea1..5ada55f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -74,6 +74,7 @@ public class EximUtil {
public static final String METADATA_NAME = "_metadata";
public static final String FILES_NAME = "_files";
public static final String DATA_PATH_NAME = "data";
+ public static final String METADATA_PATH_NAME = "metadata";
private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 8802139..c4ff070 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -54,6 +55,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
@@ -424,8 +426,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR);
- if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))
- && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) {
+ if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath,
+ ReplAck.DUMP_ACKNOWLEDGEMENT.toString()))
+ && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, LOAD_ACKNOWLEDGEMENT.toString()))) {
return hiveDumpPath;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index c3b1081..73dc606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
@@ -118,12 +117,13 @@ class PartitionExport {
// this the data copy
List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(),
forReplicationSpec, hiveConf);
- Path rootDataDumpDir = paths.partitionExportDir(partitionName);
+ Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName);
new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx)
.export(isExportTask);
+ Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName);
LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(),
- new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME));
+ dataDumpDir);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 683f3c0..b11afe8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -198,22 +198,29 @@ public class TableExport {
public static class Paths {
private final String astRepresentationForErrorMsg;
private final HiveConf conf;
- //variable access should not be done and use exportRootDir() instead.
- private final Path _exportRootDir;
+ //metadataExportRootDir and dataExportRootDir variable access should not be done and use
+ // metadataExportRootDir() and dataExportRootDir() instead.
+ private final Path metadataExportRootDir;
+ private final Path dataExportRootDir;
private final FileSystem exportFileSystem;
- private boolean writeData, exportRootDirCreated = false;
+ private boolean writeData, metadataExportRootDirCreated = false, dataExportRootDirCreated = false;
- public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf,
+ public Paths(String astRepresentationForErrorMsg, Path dbMetadataRoot, Path dbDataRoot,
+ String tblName, HiveConf conf,
boolean shouldWriteData) throws SemanticException {
this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
this.conf = conf;
this.writeData = shouldWriteData;
- Path tableRoot = new Path(dbRoot, tblName);
- URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
- validateTargetDir(exportRootDir);
- this._exportRootDir = new Path(exportRootDir);
+ Path tableRootForMetadataDump = new Path(dbMetadataRoot, tblName);
+ Path tableRootForDataDump = new Path(dbDataRoot, tblName);
+ URI metadataExportRootDirUri = EximUtil.getValidatedURI(conf, tableRootForMetadataDump.toUri().toString());
+ validateTargetDir(metadataExportRootDirUri);
+ URI dataExportRootDirUri = EximUtil.getValidatedURI(conf, tableRootForDataDump.toUri().toString());
+ validateTargetDataDir(dataExportRootDirUri);
+ this.metadataExportRootDir = new Path(metadataExportRootDirUri);
+ this.dataExportRootDir = new Path(dataExportRootDirUri);
try {
- this.exportFileSystem = this._exportRootDir.getFileSystem(conf);
+ this.exportFileSystem = this.metadataExportRootDir.getFileSystem(conf);
} catch (IOException e) {
throw new SemanticException(e);
}
@@ -223,37 +230,58 @@ public class TableExport {
boolean shouldWriteData) throws SemanticException {
this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
this.conf = conf;
- this._exportRootDir = new Path(EximUtil.getValidatedURI(conf, path));
+ this.metadataExportRootDir = new Path(EximUtil.getValidatedURI(conf, path));
+ this.dataExportRootDir = new Path(new Path(EximUtil.getValidatedURI(conf, path)), EximUtil.DATA_PATH_NAME);
this.writeData = shouldWriteData;
try {
- this.exportFileSystem = _exportRootDir.getFileSystem(conf);
+ this.exportFileSystem = metadataExportRootDir.getFileSystem(conf);
} catch (IOException e) {
throw new SemanticException(e);
}
}
- Path partitionExportDir(String partitionName) throws SemanticException {
- return exportDir(new Path(exportRootDir(), partitionName));
+ Path partitionMetadataExportDir(String partitionName) throws SemanticException {
+ return exportDir(new Path(metadataExportRootDir(), partitionName));
}
/**
- * Access to the {@link #_exportRootDir} should only be done via this method
+ * Access to the {@link #metadataExportRootDir} should only be done via this method
* since the creation of the directory is delayed until we figure out if we want
* to write something or not. This is specifically important to prevent empty non-native
* directories being created in repl dump.
*/
- public Path exportRootDir() throws SemanticException {
- if (!exportRootDirCreated) {
+ public Path metadataExportRootDir() throws SemanticException {
+ if (!metadataExportRootDirCreated) {
try {
- if (!exportFileSystem.exists(this._exportRootDir) && writeData) {
- exportFileSystem.mkdirs(this._exportRootDir);
+ if (!exportFileSystem.exists(this.metadataExportRootDir) && writeData) {
+ exportFileSystem.mkdirs(this.metadataExportRootDir);
}
- exportRootDirCreated = true;
+ metadataExportRootDirCreated = true;
} catch (IOException e) {
throw new SemanticException(e);
}
}
- return _exportRootDir;
+ return metadataExportRootDir;
+ }
+
+ /**
+ * Access to the {@link #dataExportRootDir} should only be done via this method
+ * since the creation of the directory is delayed until we figure out if we want
+ * to write something or not. This is specifically important to prevent empty non-native
+ * directories being created in repl dump.
+ */
+ public Path dataExportRootDir() throws SemanticException {
+ if (!dataExportRootDirCreated) {
+ try {
+ if (!exportFileSystem.exists(this.dataExportRootDir) && writeData) {
+ exportFileSystem.mkdirs(this.dataExportRootDir);
+ }
+ dataExportRootDirCreated = true;
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ return dataExportRootDir;
}
private Path exportDir(Path exportDir) throws SemanticException {
@@ -269,7 +297,7 @@ public class TableExport {
}
private Path metaDataExportFile() throws SemanticException {
- return new Path(exportRootDir(), EximUtil.METADATA_NAME);
+ return new Path(metadataExportRootDir(), EximUtil.METADATA_NAME);
}
/**
@@ -277,7 +305,7 @@ public class TableExport {
* Partition's data export directory is created within the export semantics of partition.
*/
private Path dataExportDir() throws SemanticException {
- return exportDir(new Path(exportRootDir(), EximUtil.DATA_PATH_NAME));
+ return exportDir(dataExportRootDir());
}
/**
@@ -310,6 +338,30 @@ public class TableExport {
throw new SemanticException(astRepresentationForErrorMsg, e);
}
}
+
+ /**
+ * this level of validation might not be required as the root directory in which we dump will
+ * be different for each run hence possibility of it having data is not there.
+ */
+ private void validateTargetDataDir(URI rootDirExportFile) throws SemanticException {
+ try {
+ FileSystem fs = FileSystem.get(rootDirExportFile, conf);
+ Path toPath = new Path(rootDirExportFile.getScheme(), rootDirExportFile.getAuthority(),
+ rootDirExportFile.getPath());
+ try {
+ FileStatus tgt = fs.getFileStatus(toPath);
+ // target exists
+ if (!tgt.isDirectory()) {
+ throw new SemanticException(
+ astRepresentationForErrorMsg + ": " + "Target is not a directory : "
+ + rootDirExportFile);
+ }
+ } catch (FileNotFoundException ignored) {
+ }
+ } catch (IOException e) {
+ throw new SemanticException(astRepresentationForErrorMsg, e);
+ }
+ }
}
public static class AuthEntities {
@@ -343,7 +395,7 @@ public class TableExport {
authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle));
}
}
- authEntities.outputs.add(toWriteEntity(paths.exportRootDir(), conf));
+ authEntities.outputs.add(toWriteEntity(paths.metadataExportRootDir(), conf));
} catch (Exception e) {
throw new SemanticException(e);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 81fac25..9973e9a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -84,11 +84,11 @@ public class TestReplDumpTask {
}
@Override
- void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) {
+ void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) {
}
@Override
- Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) {
+ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) {
return Mockito.mock(Path.class);
}
@@ -128,8 +128,9 @@ public class TestReplDumpTask {
private int tableDumpCount = 0;
@Override
- List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot,
- Path replDataDir, long lastReplId, Hive hiveDb,
+ List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList,
+ Path dbRootMetadata, Path dbRootData,
+ long lastReplId, Hive hiveDb,
HiveWrapper.Tuple<Table> tuple)
throws Exception {
tableDumpCount++;
@@ -146,7 +147,7 @@ public class TestReplDumpTask {
);
try {
- task.bootStrapDump(mock(Path.class), null, mock(Path.class), hive);
+ task.bootStrapDump(new Path("mock"), null, mock(Path.class), hive);
} finally {
verifyStatic();
Utils.resetDbBootstrapDumpState(same(hive), eq("default"), eq(dbRandomKey));