You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/04/09 11:52:57 UTC

[hive] branch master updated: HIVE-23020: Avoid using _files for replication data copy during incremental run (Pravin Kumar Sinha, reviewed by Aasha Medhi)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 796a9c5  HIVE-23020: Avoid using _files for replication data copy during incremental run (Pravin Kumar Sinha, reviewed by Aasha Medhi)
796a9c5 is described below

commit 796a9c55757e8c384f86cfd2fd8842bbb2cdd31f
Author: Pravin Kumar Sinha <ma...@gmail.com>
AuthorDate: Thu Apr 9 17:22:37 2020 +0530

    HIVE-23020: Avoid using _files for replication data copy during incremental run (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../ql/parse/TestMetaStoreEventListenerInRepl.java |   2 +-
 .../hive/ql/parse/TestReplicationScenarios.java    |  52 +++++++++-
 .../TestReplicationScenariosAcrossInstances.java   |   2 +-
 .../apache/hadoop/hive/ql/exec/ReplCopyTask.java   | 110 ++++++++++-----------
 .../hive/ql/parse/ImportSemanticAnalyzer.java      |  19 +++-
 .../hadoop/hive/ql/parse/repl/CopyUtils.java       |  33 +++++++
 .../repl/dump/events/AbstractEventHandler.java     |  43 +++-----
 .../repl/dump/events/AddPartitionHandler.java      |   6 +-
 .../parse/repl/dump/events/CommitTxnHandler.java   |  23 +----
 .../parse/repl/dump/events/CreateTableHandler.java |   7 +-
 .../ql/parse/repl/dump/events/InsertHandler.java   |  25 ++---
 11 files changed, 186 insertions(+), 136 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
index 703d16f..5dbee9e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
@@ -181,7 +181,7 @@ public class TestMetaStoreEventListenerInRepl {
 
     eventsMap = prepareIncData(primaryDbName);
     LOG.info(testName.getMethodName() + ": first incremental dump and load.");
-    primary.run("use " + primaryDbName)
+    WarehouseInstance.Tuple incre = primary.run("use " + primaryDbName)
             .dump(primaryDbName);
     replica.load(replicatedDbName, primaryDbName);
     ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index c79d4c3..fa96b87 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -1506,6 +1506,13 @@ public class TestReplicationScenarios {
 
     Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName);
 
+    //Verify dump data structure
+    Path hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), hconf);
+    verifyDataFileExist(fs, hiveDumpDir, null, new Path(unptnLocn).getName());
+    verifyDataListFileDoesNotExist(fs, hiveDumpDir, null);
+
+
     verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror);
 
     run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
@@ -1526,7 +1533,13 @@ public class TestReplicationScenarios {
             + ".ptned WHERE b=2", driver);
     verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver);
 
-    incrementalLoadAndVerify(dbName, replDbName);
+    incrementalDump = incrementalLoadAndVerify(dbName, replDbName);
+    hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    verifyDataFileExist(fs, hiveDumpDir, "b=1", new Path(ptnLocn1).getName());
+    verifyDataFileExist(fs, hiveDumpDir, "b=2", new Path(ptnLocn2).getName());
+    verifyDataListFileDoesNotExist(fs, hiveDumpDir, "b=1");
+    verifyDataListFileDoesNotExist(fs, hiveDumpDir, "b=2");
+
     verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
@@ -3788,6 +3801,43 @@ public class TestReplicationScenarios {
     assertFalse(success);
   }
 
+  private void verifyDataFileExist(FileSystem fs, Path hiveDumpDir, String part, String dataFile) throws IOException {
+    FileStatus[] eventFileStatuses = fs.listStatus(hiveDumpDir);
+    boolean dataFileFound = false;
+    for (FileStatus eventFileStatus: eventFileStatuses) {
+      String dataRelativePath = null;
+      if (part == null) {
+        dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + dataFile;
+      } else {
+        dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + part + File.separator + dataFile;
+      }
+      if (fs.exists(new Path(eventFileStatus.getPath(), dataRelativePath))) {
+        dataFileFound = true;
+        break;
+      }
+    }
+    assertTrue(dataFileFound);
+  }
+
+  private void verifyDataListFileDoesNotExist(FileSystem fs, Path hiveDumpDir, String part)
+          throws IOException {
+    FileStatus[] eventFileStatuses = fs.listStatus(hiveDumpDir);
+    boolean dataListFileFound = false;
+    for (FileStatus eventFileStatus: eventFileStatuses) {
+      String dataRelativePath = null;
+      if (part == null) {
+        dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + EximUtil.FILES_NAME;
+      } else {
+        dataRelativePath = part + File.separator + EximUtil.FILES_NAME;
+      }
+      if (fs.exists(new Path(eventFileStatus.getPath(), dataRelativePath))) {
+        dataListFileFound = true;
+        break;
+      }
+    }
+    assertFalse(dataListFileFound);
+  }
+
   private void verifyRunWithPatternMatch(String cmd, String key, String pattern, IDriver myDriver) throws IOException {
     run(cmd, myDriver);
     List<String> results = getOutput(myDriver);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 33124c8..e1b8b81 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -925,7 +925,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     Path path = new Path(hiveDumpDir);
     FileSystem fs = path.getFileSystem(conf);
     FileStatus[] fileStatus = fs.listStatus(path);
-    int numEvents = fileStatus.length - 3; //one is metadata file and one data dir and one is _dump ack
+    int numEvents = fileStatus.length - 2; //one is metadata file and one is _dump ack
 
     replica.load(replicatedDbName, primaryDbName,
         Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'"))
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 3c7274c..51c3b6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -102,35 +102,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
     }
   }
 
-  private void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List<ReplChangeManager.FileInfo> srcFiles)
-          throws IOException {
-    for (ReplChangeManager.FileInfo srcFile : srcFiles) {
-      if (srcFile.isUseSourcePath()) {
-        continue;
-      }
-      String destFileName = srcFile.getCmPath().getName();
-      Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
-      Path destFile = new Path(destRoot, destFileName);
-      if (dstFs.exists(destFile)) {
-        String destFileWithSourceName = srcFile.getSourcePath().getName();
-        Path newDestFile = new Path(destRoot, destFileWithSourceName);
-
-        // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done
-        // directly to table path (bypassing staging directory) then there might be some stale files from previous
-        // incomplete/failed load. No need of recycle as this is a case of stale file.
-        if (dstFs.exists(newDestFile)) {
-          LOG.debug(" file " + newDestFile + " is deleted before renaming");
-          dstFs.delete(newDestFile, true);
-        }
-        boolean result = dstFs.rename(destFile, newDestFile);
-        if (!result) {
-          throw new IllegalStateException(
-                  "could not rename " + destFile.getName() + " to " + newDestFile.getName());
-        }
-      }
-    }
-  }
-
   @Override
   public int execute() {
     LOG.debug("ReplCopyTask.execute()");
@@ -190,32 +161,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
         }
 
         if (work.isCopyToMigratedTxnTable()) {
-          if (work.isNeedCheckDuplicateCopy()) {
-            updateSrcFileListForDupCopy(dstFs, toPath, srcFiles,
-                    ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID);
-            if (srcFiles.isEmpty()) {
-              LOG.info("All files are already present in the base directory. Skipping copy task.");
-              return 0;
-            }
+          if (isDuplicateCopy(dstFs, toPath, srcFiles)) {
+            return 0;
           }
-          // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it
-          // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory.
-          // The toPath received in ReplCopyWork is pointing to table/partition base location.
-          // So, just need to append the base or delta directory.
-          // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and
-          // hence need to create base directory. If false, then it is repl load for regular insert into or
-          // load flow and hence just create delta directory.
-          Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
-          if (writeId == null) {
+
+          Path modifiedToPath = getModifiedToPath(toPath);
+          if (modifiedToPath == null) {
             console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
             return 6;
           }
-          // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
-          // duplicate copy from the source. Check HIVE-21197 for more detail.
-          int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ?
-                  ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
-                    context.getHiveTxnManager().getStmtIdAndIncrement();
-          toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
+          toPath = modifiedToPath;
         }
       } else {
         // This flow is usually taken for IMPORT command
@@ -231,12 +186,22 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
             return 0;
           }
         }
-
         for (FileStatus oneSrc : srcs) {
           console.printInfo("Copying file: " + oneSrc.getPath().toString());
           LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
-          srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf),
-                                                      oneSrc.getPath(), null));
+          srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), oneSrc.getPath(), null));
+        }
+        if (work.isCopyToMigratedTxnTable()) {
+          if (isDuplicateCopy(dstFs, toPath, srcFiles)) {
+            return 0;
+          }
+
+          Path modifiedToPath = getModifiedToPath(toPath);
+          if (modifiedToPath == null) {
+            console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
+            return 6;
+          }
+          toPath = modifiedToPath;
         }
       }
 
@@ -255,12 +220,13 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
         return 2;
       }
       // Copy the files from different source file systems to one destination directory
-      new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath);
+      CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs);
+      copyUtils.copyAndVerify(toPath, srcFiles, fromPath);
 
       // If a file is copied from CM path, then need to rename them using original source file name
       // This is needed to avoid having duplicate files in target if same event is applied twice
       // where the first event refers to source path and  second event refers to CM path
-      renameFileCopiedFromCmPath(toPath, dstFs, srcFiles);
+      copyUtils.renameFileCopiedFromCmPath(toPath, dstFs, srcFiles);
       return 0;
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
@@ -269,6 +235,38 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
     }
   }
 
+  private boolean isDuplicateCopy(FileSystem dstFs, Path toPath, List<ReplChangeManager.FileInfo> srcFiles)
+          throws IOException {
+    if (work.isNeedCheckDuplicateCopy()) {
+      updateSrcFileListForDupCopy(dstFs, toPath, srcFiles,
+              ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID);
+      if (srcFiles.isEmpty()) {
+        LOG.info("All files are already present in the base directory. Skipping copy task.");
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private Path getModifiedToPath(Path toPath) {
+    // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it
+    // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory.
+    // The toPath received in ReplCopyWork is pointing to table/partition base location.
+    // So, just need to append the base or delta directory.
+    // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and
+    // hence need to create base directory. If false, then it is repl load for regular insert into or
+    // load flow and hence just create delta directory.
+    Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+    if (writeId == null) {
+      return null;
+    }
+    // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
+    // duplicate copy from the source. Check HIVE-21197 for more detail.
+    int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ?
+            ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
+            context.getHiveTxnManager().getStmtIdAndIncrement();
+    return new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
+  }
   private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, Path dataPath)
       throws IOException {
     Path fileListing = new Path(dataPath, EximUtil.FILES_NAME);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 7354a3e..80515e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -77,6 +77,7 @@ import com.google.common.collect.ImmutableList;
 
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
@@ -505,9 +506,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     Task<?> copyTask = null;
     if (replicationSpec.isInReplicationScope()) {
-      boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType());
       copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
-              isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport);
+              isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
     } else {
       copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
     }
@@ -597,6 +597,18 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       return addPartTask;
     } else {
       String srcLocation = partSpec.getLocation();
+      if (replicationSpec.isInReplicationScope()
+          && !ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType())) {
+        Path partLocation = new Path(partSpec.getLocation());
+        Path dataDirBase = partLocation.getParent();
+        String bucketDir = partLocation.getName();
+        for (int i=1; i<partSpec.getPartSpec().size(); i++) {
+          bucketDir =  dataDirBase.getName() + File.separator + bucketDir;
+          dataDirBase = dataDirBase.getParent();
+        }
+        String relativePartDataPath = EximUtil.DATA_PATH_NAME + File.separator + bucketDir;
+        srcLocation =  new Path(dataDirBase, relativePartDataPath).toString();
+      }
       fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
       x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
               + partSpecToString(partSpec.getPartSpec())
@@ -641,9 +653,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       Task<?> copyTask = null;
       if (replicationSpec.isInReplicationScope()) {
-        boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType());
         copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
-                x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport);
+                x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
       } else {
         copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index fd70260..23cd128 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -232,6 +232,39 @@ public class CopyUtils {
     return pathList;
   }
 
+  /* If a file is copied from CM path, then need to rename them using original source file name
+  This is needed to avoid having duplicate files in target if same event is applied twice
+  where the first event refers to source path and  second event refers to CM path */
+
+  public void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List<ReplChangeManager.FileInfo> srcFiles)
+          throws IOException {
+    for (ReplChangeManager.FileInfo srcFile : srcFiles) {
+      if (srcFile.isUseSourcePath()) {
+        continue;
+      }
+      String destFileName = srcFile.getCmPath().getName();
+      Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
+      Path destFile = new Path(destRoot, destFileName);
+      if (dstFs.exists(destFile)) {
+        String destFileWithSourceName = srcFile.getSourcePath().getName();
+        Path newDestFile = new Path(destRoot, destFileWithSourceName);
+
+        // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done
+        // directly to table path (bypassing staging directory) then there might be some stale files from previous
+        // incomplete/failed load. No need of recycle as this is a case of stale file.
+        if (dstFs.exists(newDestFile)) {
+          LOG.debug(" file " + newDestFile + " is deleted before renaming");
+          dstFs.delete(newDestFile, true);
+        }
+        boolean result = dstFs.rename(destFile, newDestFile);
+        if (!result) {
+          throw new IllegalStateException(
+                  "could not rename " + destFile.getName() + " to " + newDestFile.getName());
+        }
+      }
+    }
+  }
+
   // Check if the source file unmodified even after copy to see if we copied the right file
   private boolean isSourceFileMismatch(FileSystem sourceFs, ReplChangeManager.FileInfo srcFile) throws IOException {
     // If source is already CM path, the checksum will be always matching
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index 049c06b..fed0d50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
 import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
@@ -37,8 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.login.LoginException;
-import java.io.BufferedWriter;
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -89,42 +88,28 @@ abstract class AbstractEventHandler<T extends EventMessage> implements EventHand
     return event.getEventId();
   }
 
-  protected void writeFileEntry(String dbName, Table table, String file, BufferedWriter fileListWriter,
-                                Context withinContext)
+  protected void writeFileEntry(Table table, Partition ptn, String file, Context withinContext)
           throws IOException, LoginException, MetaException, HiveFatalException {
     HiveConf hiveConf = withinContext.hiveConf;
     String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
     if (!Utils.shouldDumpMetaDataOnly(withinContext.hiveConf)) {
-      Path dataPath = new Path(withinContext.dumpRoot.toString(), EximUtil.DATA_PATH_NAME);
-      List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
+      Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      if (table.isPartitioned()) {
+        dataPath = new Path(dataPath, ptn.getName());
+      }
       String[] decodedURISplits = ReplChangeManager.decodeFileUri(file);
-      String srcDataFile = decodedURISplits[0];
-      Path srcDataPath = new Path(srcDataFile);
+      Path srcDataPath = new Path(decodedURISplits[0]);
       if (dataPath.toUri().getScheme() == null) {
         dataPath = new Path(srcDataPath.toUri().getScheme(), srcDataPath.toUri().getAuthority(), dataPath.toString());
       }
-      String eventTblPath = event.getEventId() + File.separator + dbName + File.separator + table.getTableName();
-      String srcDataFileRelativePath = null;
-      if (srcDataFile.contains(table.getPath().toString())) {
-        srcDataFileRelativePath = srcDataFile.substring(table.getPath().toString().length() + 1);
-      } else if (decodedURISplits[3] == null) {
-        srcDataFileRelativePath = srcDataPath.getName();
-      } else {
-        srcDataFileRelativePath = srcDataFileRelativePath + File.separator + srcDataPath.getName();
-      }
-      Path targetPath = new Path(dataPath, eventTblPath + File.separator + srcDataFileRelativePath);
-      String encodedTargetPath = ReplChangeManager.encodeFileUri(
-              targetPath.toString(), decodedURISplits[1], decodedURISplits[3]);
-      ReplChangeManager.FileInfo f = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]),
+      List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
+      ReplChangeManager.FileInfo fileInfo = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]),
                   decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf);
-      filePaths.add(f);
-      FileSystem dstFs = targetPath.getFileSystem(hiveConf);
-      Path finalTargetPath = targetPath.getParent();
-      if (decodedURISplits[3] != null) {
-        finalTargetPath = finalTargetPath.getParent();
-      }
-      new CopyUtils(distCpDoAsUser, hiveConf, dstFs).copyAndVerify(finalTargetPath, filePaths, srcDataPath);
-      fileListWriter.write(encodedTargetPath + "\n");
+      filePaths.add(fileInfo);
+      FileSystem dstFs = dataPath.getFileSystem(hiveConf);
+      CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
+      copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath);
+      copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths);
     }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index a06b90d..8506532 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -108,10 +108,8 @@ class AddPartitionHandler extends AbstractEventHandler {
         Iterable<String> files = partitionFilesIter.next().getFiles();
         if (files != null) {
           // encoded filename/checksum of files, write into _files
-          try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
-            for (String file : files) {
-              writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
-            }
+          for (String file : files) {
+            writeFileEntry(qlMdTable, qlPtn, file, withinContext);
           }
         }
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index dc87506..36369db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -39,12 +39,9 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import org.apache.hadoop.fs.FileSystem;
 
 import javax.security.auth.login.LoginException;
-import java.io.BufferedWriter;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -59,19 +56,11 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
     return deserializer.getCommitTxnMessage(stringRepresentation);
   }
 
-  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
-    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
-    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
-    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
-  }
-
-  private void writeDumpFiles(Table qlMdTable, Context withinContext, Iterable<String> files, Path dataPath)
+  private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext)
           throws IOException, LoginException, MetaException, HiveFatalException {
     // encoded filename/checksum of files, write into _files
-    try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
-      for (String file : files) {
-        writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
-      }
+    for (String file : files) {
+      writeFileEntry(qlMdTable, ptn, file, withinContext);
     }
   }
 
@@ -92,12 +81,10 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
             withinContext.hiveConf);
 
     if ((null == qlPtns) || qlPtns.isEmpty()) {
-      Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
-      writeDumpFiles(qlMdTable, withinContext, fileListArray.get(0), dataPath);
+      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext);
     } else {
       for (int idx = 0; idx < qlPtns.size(); idx++) {
-        Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName());
-        writeDumpFiles(qlMdTable, withinContext, fileListArray.get(idx), dataPath);
+        writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext);
       }
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 7a6ddf9..c732b21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -79,14 +79,11 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
         withinContext.replicationSpec,
         withinContext.hiveConf);
 
-    Path dataPath = new Path(withinContext.eventRoot, "data");
     Iterable<String> files = eventMessage.getFiles();
     if (files != null) {
       // encoded filename/checksum of files, write into _files
-      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
-        for (String file : files) {
-          writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
-        }
+      for (String file : files) {
+        writeFileEntry(qlMdTable, null, file, withinContext);
       }
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 4e02620..701dd6b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -80,25 +80,16 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> {
         withinContext.hiveConf);
     Iterable<String> files = eventMessage.getFiles();
 
+    /*
+      * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables.
+      * But, Insert event is generated for each partition to which the data is inserted.
+      * So, qlPtns list will have only one entry.
+     */
+    Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0);
     if (files != null) {
-      Path dataPath;
-      if ((null == qlPtns) || qlPtns.isEmpty()) {
-        dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
-      } else {
-        /*
-         * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple
-         * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list
-         * will have only one entry.
-         */
-        assert(1 == qlPtns.size());
-        dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName());
-      }
-
       // encoded filename/checksum of files, write into _files
-      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
-        for (String file : files) {
-          writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext);
-        }
+      for (String file : files) {
+        writeFileEntry(qlMdTable, ptn, file, withinContext);
       }
     }