You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/04/09 11:52:57 UTC
[hive] branch master updated: HIVE-23020: Avoid using _files for
replication data copy during incremental run (Pravin Kumar Sinha,
reviewed by Aasha Medhi)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 796a9c5 HIVE-23020: Avoid using _files for replication data copy during incremental run (Pravin Kumar Sinha, reviewed by Aasha Medhi)
796a9c5 is described below
commit 796a9c55757e8c384f86cfd2fd8842bbb2cdd31f
Author: Pravin Kumar Sinha <ma...@gmail.com>
AuthorDate: Thu Apr 9 17:22:37 2020 +0530
HIVE-23020: Avoid using _files for replication data copy during incremental run (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../ql/parse/TestMetaStoreEventListenerInRepl.java | 2 +-
.../hive/ql/parse/TestReplicationScenarios.java | 52 +++++++++-
.../TestReplicationScenariosAcrossInstances.java | 2 +-
.../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 110 ++++++++++-----------
.../hive/ql/parse/ImportSemanticAnalyzer.java | 19 +++-
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 33 +++++++
.../repl/dump/events/AbstractEventHandler.java | 43 +++-----
.../repl/dump/events/AddPartitionHandler.java | 6 +-
.../parse/repl/dump/events/CommitTxnHandler.java | 23 +----
.../parse/repl/dump/events/CreateTableHandler.java | 7 +-
.../ql/parse/repl/dump/events/InsertHandler.java | 25 ++---
11 files changed, 186 insertions(+), 136 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
index 703d16f..5dbee9e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
@@ -181,7 +181,7 @@ public class TestMetaStoreEventListenerInRepl {
eventsMap = prepareIncData(primaryDbName);
LOG.info(testName.getMethodName() + ": first incremental dump and load.");
- primary.run("use " + primaryDbName)
+ WarehouseInstance.Tuple incre = primary.run("use " + primaryDbName)
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);
ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index c79d4c3..fa96b87 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -1506,6 +1506,13 @@ public class TestReplicationScenarios {
Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName);
+ //Verify dump data structure
+ Path hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), hconf);
+ verifyDataFileExist(fs, hiveDumpDir, null, new Path(unptnLocn).getName());
+ verifyDataListFileDoesNotExist(fs, hiveDumpDir, null);
+
+
verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror);
run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
@@ -1526,7 +1533,13 @@ public class TestReplicationScenarios {
+ ".ptned WHERE b=2", driver);
verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver);
- incrementalLoadAndVerify(dbName, replDbName);
+ incrementalDump = incrementalLoadAndVerify(dbName, replDbName);
+ hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ verifyDataFileExist(fs, hiveDumpDir, "b=1", new Path(ptnLocn1).getName());
+ verifyDataFileExist(fs, hiveDumpDir, "b=2", new Path(ptnLocn2).getName());
+ verifyDataListFileDoesNotExist(fs, hiveDumpDir, "b=1");
+ verifyDataListFileDoesNotExist(fs, hiveDumpDir, "b=2");
+
verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror);
verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror);
verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
@@ -3788,6 +3801,43 @@ public class TestReplicationScenarios {
assertFalse(success);
}
+ private void verifyDataFileExist(FileSystem fs, Path hiveDumpDir, String part, String dataFile) throws IOException {
+ FileStatus[] eventFileStatuses = fs.listStatus(hiveDumpDir);
+ boolean dataFileFound = false;
+ for (FileStatus eventFileStatus: eventFileStatuses) {
+ String dataRelativePath = null;
+ if (part == null) {
+ dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + dataFile;
+ } else {
+ dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + part + File.separator + dataFile;
+ }
+ if (fs.exists(new Path(eventFileStatus.getPath(), dataRelativePath))) {
+ dataFileFound = true;
+ break;
+ }
+ }
+ assertTrue(dataFileFound);
+ }
+
+ private void verifyDataListFileDoesNotExist(FileSystem fs, Path hiveDumpDir, String part)
+ throws IOException {
+ FileStatus[] eventFileStatuses = fs.listStatus(hiveDumpDir);
+ boolean dataListFileFound = false;
+ for (FileStatus eventFileStatus: eventFileStatuses) {
+ String dataRelativePath = null;
+ if (part == null) {
+ dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + EximUtil.FILES_NAME;
+ } else {
+ dataRelativePath = part + File.separator + EximUtil.FILES_NAME;
+ }
+ if (fs.exists(new Path(eventFileStatus.getPath(), dataRelativePath))) {
+ dataListFileFound = true;
+ break;
+ }
+ }
+ assertFalse(dataListFileFound);
+ }
+
private void verifyRunWithPatternMatch(String cmd, String key, String pattern, IDriver myDriver) throws IOException {
run(cmd, myDriver);
List<String> results = getOutput(myDriver);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 33124c8..e1b8b81 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -925,7 +925,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
Path path = new Path(hiveDumpDir);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] fileStatus = fs.listStatus(path);
- int numEvents = fileStatus.length - 3; //one is metadata file and one data dir and one is _dump ack
+ int numEvents = fileStatus.length - 2; //one is metadata file and one is _dump ack
replica.load(replicatedDbName, primaryDbName,
Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'"))
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 3c7274c..51c3b6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -102,35 +102,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
}
}
- private void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List<ReplChangeManager.FileInfo> srcFiles)
- throws IOException {
- for (ReplChangeManager.FileInfo srcFile : srcFiles) {
- if (srcFile.isUseSourcePath()) {
- continue;
- }
- String destFileName = srcFile.getCmPath().getName();
- Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
- Path destFile = new Path(destRoot, destFileName);
- if (dstFs.exists(destFile)) {
- String destFileWithSourceName = srcFile.getSourcePath().getName();
- Path newDestFile = new Path(destRoot, destFileWithSourceName);
-
- // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done
- // directly to table path (bypassing staging directory) then there might be some stale files from previous
- // incomplete/failed load. No need of recycle as this is a case of stale file.
- if (dstFs.exists(newDestFile)) {
- LOG.debug(" file " + newDestFile + " is deleted before renaming");
- dstFs.delete(newDestFile, true);
- }
- boolean result = dstFs.rename(destFile, newDestFile);
- if (!result) {
- throw new IllegalStateException(
- "could not rename " + destFile.getName() + " to " + newDestFile.getName());
- }
- }
- }
- }
-
@Override
public int execute() {
LOG.debug("ReplCopyTask.execute()");
@@ -190,32 +161,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
}
if (work.isCopyToMigratedTxnTable()) {
- if (work.isNeedCheckDuplicateCopy()) {
- updateSrcFileListForDupCopy(dstFs, toPath, srcFiles,
- ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID);
- if (srcFiles.isEmpty()) {
- LOG.info("All files are already present in the base directory. Skipping copy task.");
- return 0;
- }
+ if (isDuplicateCopy(dstFs, toPath, srcFiles)) {
+ return 0;
}
- // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it
- // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory.
- // The toPath received in ReplCopyWork is pointing to table/partition base location.
- // So, just need to append the base or delta directory.
- // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and
- // hence need to create base directory. If false, then it is repl load for regular insert into or
- // load flow and hence just create delta directory.
- Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
- if (writeId == null) {
+
+ Path modifiedToPath = getModifiedToPath(toPath);
+ if (modifiedToPath == null) {
console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
return 6;
}
- // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
- // duplicate copy from the source. Check HIVE-21197 for more detail.
- int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ?
- ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
- context.getHiveTxnManager().getStmtIdAndIncrement();
- toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
+ toPath = modifiedToPath;
}
} else {
// This flow is usually taken for IMPORT command
@@ -231,12 +186,22 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
return 0;
}
}
-
for (FileStatus oneSrc : srcs) {
console.printInfo("Copying file: " + oneSrc.getPath().toString());
LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
- srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf),
- oneSrc.getPath(), null));
+ srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), oneSrc.getPath(), null));
+ }
+ if (work.isCopyToMigratedTxnTable()) {
+ if (isDuplicateCopy(dstFs, toPath, srcFiles)) {
+ return 0;
+ }
+
+ Path modifiedToPath = getModifiedToPath(toPath);
+ if (modifiedToPath == null) {
+ console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
+ return 6;
+ }
+ toPath = modifiedToPath;
}
}
@@ -255,12 +220,13 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
return 2;
}
// Copy the files from different source file systems to one destination directory
- new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath);
+ CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs);
+ copyUtils.copyAndVerify(toPath, srcFiles, fromPath);
// If a file is copied from CM path, then need to rename them using original source file name
// This is needed to avoid having duplicate files in target if same event is applied twice
// where the first event refers to source path and second event refers to CM path
- renameFileCopiedFromCmPath(toPath, dstFs, srcFiles);
+ copyUtils.renameFileCopiedFromCmPath(toPath, dstFs, srcFiles);
return 0;
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
@@ -269,6 +235,38 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
}
}
+ private boolean isDuplicateCopy(FileSystem dstFs, Path toPath, List<ReplChangeManager.FileInfo> srcFiles)
+ throws IOException {
+ if (work.isNeedCheckDuplicateCopy()) {
+ updateSrcFileListForDupCopy(dstFs, toPath, srcFiles,
+ ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID);
+ if (srcFiles.isEmpty()) {
+ LOG.info("All files are already present in the base directory. Skipping copy task.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Path getModifiedToPath(Path toPath) {
+ // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it
+ // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory.
+ // The toPath received in ReplCopyWork is pointing to table/partition base location.
+ // So, just need to append the base or delta directory.
+ // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and
+ // hence need to create base directory. If false, then it is repl load for regular insert into or
+ // load flow and hence just create delta directory.
+ Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+ if (writeId == null) {
+ return null;
+ }
+ // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
+ // duplicate copy from the source. Check HIVE-21197 for more detail.
+ int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ?
+ ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
+ context.getHiveTxnManager().getStmtIdAndIncrement();
+ return new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
+ }
private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, Path dataPath)
throws IOException {
Path fileListing = new Path(dataPath, EximUtil.FILES_NAME);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 7354a3e..80515e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -77,6 +77,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
@@ -505,9 +506,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
- boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType());
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
- isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport);
+ isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}
@@ -597,6 +597,18 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return addPartTask;
} else {
String srcLocation = partSpec.getLocation();
+ if (replicationSpec.isInReplicationScope()
+ && !ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType())) {
+ Path partLocation = new Path(partSpec.getLocation());
+ Path dataDirBase = partLocation.getParent();
+ String bucketDir = partLocation.getName();
+ for (int i=1; i<partSpec.getPartSpec().size(); i++) {
+ bucketDir = dataDirBase.getName() + File.separator + bucketDir;
+ dataDirBase = dataDirBase.getParent();
+ }
+ String relativePartDataPath = EximUtil.DATA_PATH_NAME + File.separator + bucketDir;
+ srcLocation = new Path(dataDirBase, relativePartDataPath).toString();
+ }
fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ partSpecToString(partSpec.getPartSpec())
@@ -641,9 +653,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
- boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType());
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
- x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport);
+ x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index fd70260..23cd128 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -232,6 +232,39 @@ public class CopyUtils {
return pathList;
}
+ /* If a file is copied from CM path, then need to rename them using original source file name
+ This is needed to avoid having duplicate files in target if same event is applied twice
+ where the first event refers to source path and second event refers to CM path */
+
+ public void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List<ReplChangeManager.FileInfo> srcFiles)
+ throws IOException {
+ for (ReplChangeManager.FileInfo srcFile : srcFiles) {
+ if (srcFile.isUseSourcePath()) {
+ continue;
+ }
+ String destFileName = srcFile.getCmPath().getName();
+ Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
+ Path destFile = new Path(destRoot, destFileName);
+ if (dstFs.exists(destFile)) {
+ String destFileWithSourceName = srcFile.getSourcePath().getName();
+ Path newDestFile = new Path(destRoot, destFileWithSourceName);
+
+ // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done
+ // directly to table path (bypassing staging directory) then there might be some stale files from previous
+ // incomplete/failed load. No need of recycle as this is a case of stale file.
+ if (dstFs.exists(newDestFile)) {
+ LOG.debug(" file " + newDestFile + " is deleted before renaming");
+ dstFs.delete(newDestFile, true);
+ }
+ boolean result = dstFs.rename(destFile, newDestFile);
+ if (!result) {
+ throw new IllegalStateException(
+ "could not rename " + destFile.getName() + " to " + newDestFile.getName());
+ }
+ }
+ }
+ }
+
// Check if the source file unmodified even after copy to see if we copied the right file
private boolean isSourceFileMismatch(FileSystem sourceFs, ReplChangeManager.FileInfo srcFile) throws IOException {
// If source is already CM path, the checksum will be always matching
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index 049c06b..fed0d50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
@@ -37,8 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.LoginException;
-import java.io.BufferedWriter;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -89,42 +88,28 @@ abstract class AbstractEventHandler<T extends EventMessage> implements EventHand
return event.getEventId();
}
- protected void writeFileEntry(String dbName, Table table, String file, BufferedWriter fileListWriter,
- Context withinContext)
+ protected void writeFileEntry(Table table, Partition ptn, String file, Context withinContext)
throws IOException, LoginException, MetaException, HiveFatalException {
HiveConf hiveConf = withinContext.hiveConf;
String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
if (!Utils.shouldDumpMetaDataOnly(withinContext.hiveConf)) {
- Path dataPath = new Path(withinContext.dumpRoot.toString(), EximUtil.DATA_PATH_NAME);
- List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
+ Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+ if (table.isPartitioned()) {
+ dataPath = new Path(dataPath, ptn.getName());
+ }
String[] decodedURISplits = ReplChangeManager.decodeFileUri(file);
- String srcDataFile = decodedURISplits[0];
- Path srcDataPath = new Path(srcDataFile);
+ Path srcDataPath = new Path(decodedURISplits[0]);
if (dataPath.toUri().getScheme() == null) {
dataPath = new Path(srcDataPath.toUri().getScheme(), srcDataPath.toUri().getAuthority(), dataPath.toString());
}
- String eventTblPath = event.getEventId() + File.separator + dbName + File.separator + table.getTableName();
- String srcDataFileRelativePath = null;
- if (srcDataFile.contains(table.getPath().toString())) {
- srcDataFileRelativePath = srcDataFile.substring(table.getPath().toString().length() + 1);
- } else if (decodedURISplits[3] == null) {
- srcDataFileRelativePath = srcDataPath.getName();
- } else {
- srcDataFileRelativePath = srcDataFileRelativePath + File.separator + srcDataPath.getName();
- }
- Path targetPath = new Path(dataPath, eventTblPath + File.separator + srcDataFileRelativePath);
- String encodedTargetPath = ReplChangeManager.encodeFileUri(
- targetPath.toString(), decodedURISplits[1], decodedURISplits[3]);
- ReplChangeManager.FileInfo f = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]),
+ List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
+ ReplChangeManager.FileInfo fileInfo = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]),
decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf);
- filePaths.add(f);
- FileSystem dstFs = targetPath.getFileSystem(hiveConf);
- Path finalTargetPath = targetPath.getParent();
- if (decodedURISplits[3] != null) {
- finalTargetPath = finalTargetPath.getParent();
- }
- new CopyUtils(distCpDoAsUser, hiveConf, dstFs).copyAndVerify(finalTargetPath, filePaths, srcDataPath);
- fileListWriter.write(encodedTargetPath + "\n");
+ filePaths.add(fileInfo);
+ FileSystem dstFs = dataPath.getFileSystem(hiveConf);
+ CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
+ copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath);
+ copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index a06b90d..8506532 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -108,10 +108,8 @@ class AddPartitionHandler extends AbstractEventHandler {
Iterable<String> files = partitionFilesIter.next().getFiles();
if (files != null) {
// encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
- for (String file : files) {
- writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
- }
+ for (String file : files) {
+ writeFileEntry(qlMdTable, qlPtn, file, withinContext);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index dc87506..36369db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -39,12 +39,9 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import org.apache.hadoop.fs.FileSystem;
import javax.security.auth.login.LoginException;
-import java.io.BufferedWriter;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
@@ -59,19 +56,11 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
return deserializer.getCommitTxnMessage(stringRepresentation);
}
- private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
- Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
- FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
- return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
- }
-
- private void writeDumpFiles(Table qlMdTable, Context withinContext, Iterable<String> files, Path dataPath)
+ private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext)
throws IOException, LoginException, MetaException, HiveFatalException {
// encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
- for (String file : files) {
- writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
- }
+ for (String file : files) {
+ writeFileEntry(qlMdTable, ptn, file, withinContext);
}
}
@@ -92,12 +81,10 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
withinContext.hiveConf);
if ((null == qlPtns) || qlPtns.isEmpty()) {
- Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
- writeDumpFiles(qlMdTable, withinContext, fileListArray.get(0), dataPath);
+ writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext);
} else {
for (int idx = 0; idx < qlPtns.size(); idx++) {
- Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName());
- writeDumpFiles(qlMdTable, withinContext, fileListArray.get(idx), dataPath);
+ writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 7a6ddf9..c732b21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -79,14 +79,11 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
withinContext.replicationSpec,
withinContext.hiveConf);
- Path dataPath = new Path(withinContext.eventRoot, "data");
Iterable<String> files = eventMessage.getFiles();
if (files != null) {
// encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
- for (String file : files) {
- writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
- }
+ for (String file : files) {
+ writeFileEntry(qlMdTable, null, file, withinContext);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 4e02620..701dd6b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -80,25 +80,16 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> {
withinContext.hiveConf);
Iterable<String> files = eventMessage.getFiles();
+ /*
+ * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables.
+ * But, Insert event is generated for each partition to which the data is inserted.
+ * So, qlPtns list will have only one entry.
+ */
+ Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0);
if (files != null) {
- Path dataPath;
- if ((null == qlPtns) || qlPtns.isEmpty()) {
- dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
- } else {
- /*
- * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
- * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
- * will have only one entry.
- */
- assert(1 == qlPtns.size());
- dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
- }
-
// encoded filename/checksum of files, write into _files
- try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
- for (String file : files) {
- writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
- }
+ for (String file : files) {
+ writeFileEntry(qlMdTable, ptn, file, withinContext);
}
}