You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/03/21 14:58:10 UTC

[hive] branch master updated: HIVE-21446 : Hive Server going OOM during hive external table replications. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 38682a4  HIVE-21446 : Hive Server going OOM during hive external table replications. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
38682a4 is described below

commit 38682a414708d810937012bae4b0c97deca5ef07
Author: Mahesh Kumar Behera <mb...@hortonworks.com>
AuthorDate: Thu Mar 21 20:27:38 2019 +0530

    HIVE-21446 : Hive Server going OOM during hive external table replications. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
---
 .../org/apache/hadoop/hive/common/FileUtils.java   |  10 +-
 .../apache/hadoop/hive/common/TestFileUtils.java   |  11 +-
 .../apache/hadoop/hive/ql/exec/ReplCopyTask.java   |   2 +-
 .../ql/exec/repl/ExternalTableCopyTaskBuilder.java |  86 ++++++++----
 .../hadoop/hive/ql/parse/repl/CopyUtils.java       | 150 ++++++++++++++-------
 .../hive/ql/parse/repl/dump/io/FileOperations.java |   5 +-
 .../hadoop/hive/ql/parse/repl/TestCopyUtils.java   |   8 +-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java    |   5 +-
 .../org/apache/hadoop/hive/shims/HadoopShims.java  |   5 +-
 9 files changed, 182 insertions(+), 100 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 23a3a6b..8b03faa 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -636,18 +636,18 @@ public final class FileUtils {
   }
 
   public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
-      boolean deleteSource, String doAsUser,
+      boolean deleteSource, UserGroupInformation proxyUser,
       HiveConf conf, HadoopShims shims) throws IOException {
     LOG.debug("copying srcPaths : {}, to DestPath :{} ,with doAs: {}",
-        StringUtils.join(",", srcPaths), dst.toString(), doAsUser);
+        StringUtils.join(",", srcPaths), dst.toString(), proxyUser);
     boolean copied = false;
-    if (doAsUser == null){
+    if (proxyUser == null){
       copied = shims.runDistCp(srcPaths, dst, conf);
     } else {
-      copied = shims.runDistCpAs(srcPaths, dst, conf, doAsUser);
+      copied = shims.runDistCpAs(srcPaths, dst, conf, proxyUser);
     }
     if (copied && deleteSource) {
-      if (doAsUser != null) {
+      if (proxyUser != null) {
         // if distcp is done using doAsUser, delete also should be done using same user.
         //TODO : Need to change the delete execution within doAs if doAsUser is given.
         throw new IOException("Distcp is called with doAsUser and delete source set as true");
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
index b45832e..9b5748e 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.HadoopShims;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -239,14 +240,16 @@ public class TestFileUtils {
     FileSystem fs = copySrc.getFileSystem(conf);
 
     String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+            doAsUser, UserGroupInformation.getLoginUser());
 
     HadoopShims shims = mock(HadoopShims.class);
-    when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser)).thenReturn(true);
+    when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, proxyUser)).thenReturn(true);
     when(shims.runDistCp(Collections.singletonList(copySrc), copyDst, conf)).thenReturn(false);
 
     // doAs when asked
-    Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, false, doAsUser, conf, shims));
-    verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser);
+    Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, false, proxyUser, conf, shims));
+    verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, proxyUser);
     // don't doAs when not asked
     Assert.assertFalse(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, null, conf, shims));
     verify(shims).runDistCp(Collections.singletonList(copySrc), copyDst, conf);
@@ -254,7 +257,7 @@ public class TestFileUtils {
     // When distcp is done with doAs, the delete should also be done as doAs. But in current code its broken. This
     // should be fixed. For now check is added to avoid wrong usage. So if doAs is set, delete source should be false.
     try {
-      FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, doAsUser, conf, shims);
+      FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, proxyUser, conf, shims);
       Assert.assertTrue("Should throw IOException as doAs is called with delete source set to true".equals(""));
     } catch (IOException e) {
       Assert.assertTrue(e.getMessage().
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 55a0c1f..c34f075 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -257,7 +257,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
         return 2;
       }
       // Copy the files from different source file systems to one destination directory
-      new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(dstFs, toPath, srcFiles);
+      new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles);
 
       // If a file is copied from CM path, then need to rename them using original source file name
       // This is needed to avoid having duplicate files in target if same event is applied twice
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
index d7eed2c..6bc3cd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
@@ -90,7 +90,7 @@ public class ExternalTableCopyTaskBuilder {
       } catch (FileNotFoundException e) {
         // Don't delete target path created else ddl task will try to create it using user hive and may fail.
         LOG.warn("source path missing " + sourcePath);
-        return false;
+        return createdDir;
       }
       LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}",
               destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission());
@@ -99,53 +99,65 @@ public class ExternalTableCopyTaskBuilder {
       return createdDir;
     }
 
-    private boolean setTargetPathOwner(Path targetPath, Path sourcePath, String distCpDoAsUser)
-            throws IOException {
-      if (distCpDoAsUser == null) {
+    private boolean setTargetPathOwner(Path targetPath, Path sourcePath, UserGroupInformation proxyUser)
+            throws IOException, InterruptedException {
+      if (proxyUser == null) {
         return createAndSetPathOwner(targetPath, sourcePath);
       }
-      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
-              distCpDoAsUser, UserGroupInformation.getLoginUser());
-      try {
-        Path finalTargetPath = targetPath;
-        Path finalSourcePath = sourcePath;
-        return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
-                createAndSetPathOwner(finalTargetPath, finalSourcePath));
-      } catch (InterruptedException e) {
-        throw new IOException(e);
+      return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
+                createAndSetPathOwner(targetPath, sourcePath));
+    }
+
+    private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser) throws Exception {
+      if (proxyUser == null) {
+        return sourcePath.getFileSystem(conf).exists(sourcePath);
       }
+      return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
+              sourcePath.getFileSystem(conf).exists(sourcePath));
     }
 
-    private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry) {
+    private int handleException(Exception e, Path sourcePath, Path targetPath,
+                                int currentRetry, UserGroupInformation proxyUser) {
       try {
-        if (!sourcePath.getFileSystem(conf).exists(sourcePath)) {
-          LOG.warn("Source path missing " + sourcePath, e);
+        LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e);
+        if (!checkIfPathExist(sourcePath, proxyUser)) {
+          LOG.info("Source path is missing. Ignoring exception.");
           return 0;
         }
       } catch (Exception ex) {
-        LOG.warn("Source path missing check failed" + sourcePath, ex);
+        LOG.warn("Source path missing check failed. ", ex);
+      }
+
+      // retry logic only for i/o exception
+      if (!(e instanceof IOException)) {
+        LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e);
+        setException(e);
+        return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
       }
 
       if (currentRetry <= MAX_COPY_RETRY) {
-        LOG.warn("unable to copy {} to {}", sourcePath, targetPath, e);
+        LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e);
       } else {
-        LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
+        LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e);
         setException(e);
-        return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+        return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode();
       }
 
       int sleepTime = FileUtils.getSleepTime(currentRetry);
-      LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
+      LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry));
       try {
         Thread.sleep(sleepTime);
       } catch (InterruptedException timerEx) {
-        LOG.info("sleep interrupted", timerEx.getMessage());
+        LOG.info("Sleep interrupted", timerEx.getMessage());
       }
 
       try {
-        FileSystem.closeAllForUGI(Utils.getUGI());
+        if (proxyUser == null) {
+          proxyUser = Utils.getUGI();
+        }
+        FileSystem.closeAllForUGI(proxyUser);
       } catch (Exception ex) {
-        LOG.error("unable to closeAllForUGI", ex);
+        LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex);
       }
       return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
     }
@@ -162,14 +174,17 @@ public class ExternalTableCopyTaskBuilder {
       }
       int currentRetry = 0;
       int error = 0;
+      UserGroupInformation proxyUser = null;
       while (currentRetry <= MAX_COPY_RETRY) {
         try {
           UserGroupInformation ugi = Utils.getUGI();
           String currentUser = ugi.getShortUserName();
-          boolean usePrivilegedUser =
-              distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser);
+          if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
+            proxyUser = UserGroupInformation.createProxyUser(
+                    distCpDoAsUser, UserGroupInformation.getLoginUser());
+          }
 
-          setTargetPathOwner(targetPath, sourcePath, usePrivilegedUser ? distCpDoAsUser : null);
+          setTargetPathOwner(targetPath, sourcePath, proxyUser);
 
           // do we create a new conf and only here provide this additional option so that we get away from
           // differences of data in two location for the same directories ?
@@ -179,16 +194,29 @@ public class ExternalTableCopyTaskBuilder {
               Collections.singletonList(sourcePath),  // list of source paths
               targetPath,
               false,
-              usePrivilegedUser ? distCpDoAsUser : null,
+              proxyUser,
               conf,
               ShimLoader.getHadoopShims());
           return 0;
         } catch (Exception e) {
           currentRetry++;
-          error = handleException(e, sourcePath, targetPath, currentRetry);
+          error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser);
           if (error == 0) {
             return 0;
           }
+        } finally {
+          if (proxyUser != null) {
+            try {
+              FileSystem.closeAllForUGI(proxyUser);
+            } catch (IOException e) {
+              LOG.error("Unable to closeAllForUGI for user " + proxyUser, e);
+              if (error == 0) {
+                setException(e);
+                error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+              }
+              break;
+            }
+          }
         }
       }
       return error;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 686fe7b..73c863e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -50,59 +50,70 @@ public class CopyUtils {
   private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class);
   // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
   public static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
-  private static final int MAX_COPY_RETRY = 5;
+  private static final int MAX_IO_RETRY = 5;
 
   private final HiveConf hiveConf;
   private final long maxCopyFileSize;
   private final long maxNumberOfFiles;
   private final boolean hiveInTest;
   private final String copyAsUser;
+  private FileSystem destinationFs;
 
-  public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
+  public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) {
     this.hiveConf = hiveConf;
     maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES);
     maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE);
     hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);
     this.copyAsUser = distCpDoAsUser;
+    this.destinationFs = destinationFs;
   }
 
   // Used by replication, copy files from source to destination. It is possible source file is
   // changed/removed during copy, so double check the checksum after copy,
   // if not match, copy again from cm
-  public void copyAndVerify(FileSystem destinationFs, Path destRoot,
+  public void copyAndVerify(Path destRoot,
                     List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException, HiveFatalException {
     Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot);
-    for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
-      FileSystem sourceFs = entry.getKey();
-      Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue();
-      for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) {
-        Path destination = destMapEntry.getKey();
-        List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
-        boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList);
-
-        if (!destinationFs.exists(destination)
-                && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
-          LOG.error("Failed to create destination directory: " + destination);
-          throw new IOException("Destination directory creation failed");
-        }
+    UserGroupInformation proxyUser = getProxyUser();
+    try {
+      for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
+        Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue();
+        for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) {
+          Path destination = destMapEntry.getKey();
+          List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
+          // Get the file system again from cache. There is a chance that the file system stored in the map is closed.
+          // For instance, doCopyRetry closes the file system in case of i/o exceptions.
+          FileSystem sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf);
+          boolean useRegularCopy = regularCopy(sourceFs, fileInfoList);
+
+          if (!destinationFs.exists(destination)
+                  && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
+            LOG.error("Failed to create destination directory: " + destination);
+            throw new IOException("Destination directory creation failed");
+          }
 
-		    // Copy files with retry logic on failure or source file is dropped or changed.
-        doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy);
+          // Copy files with retry logic on failure or source file is dropped or changed.
+          doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy);
+        }
+      }
+    } finally {
+      if (proxyUser != null) {
+        FileSystem.closeAllForUGI(proxyUser);
       }
     }
   }
 
   private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList,
-                           FileSystem destinationFs, Path destination,
+                           Path destination, UserGroupInformation proxyUser,
                            boolean useRegularCopy) throws IOException, LoginException, HiveFatalException {
     int repeat = 0;
     boolean isCopyError = false;
     List<Path> pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath);
-    while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) {
+    while (!pathList.isEmpty() && (repeat < MAX_IO_RETRY)) {
       try {
         // if its retrying, first regenerate the path list.
         if (repeat > 0) {
-          pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, isCopyError);
+          pathList = getFilesToRetry(sourceFs, srcFileList, destination, isCopyError);
           if (pathList.isEmpty()) {
             // all files were copied successfully in last try. So can break from here.
             break;
@@ -113,7 +124,7 @@ public class CopyUtils {
 
         // if exception happens during doCopyOnce, then need to call getFilesToRetry with copy error as true in retry.
         isCopyError = true;
-        doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy);
+        doCopyOnce(sourceFs, pathList, destination, useRegularCopy, proxyUser);
 
         // if exception happens after doCopyOnce, then need to call getFilesToRetry with copy error as false in retry.
         isCopyError = false;
@@ -121,7 +132,7 @@ public class CopyUtils {
         // If copy fails, fall through the retry logic
         LOG.info("file operation failed", e);
 
-        if (repeat >= (MAX_COPY_RETRY - 1)) {
+        if (repeat >= (MAX_IO_RETRY - 1)) {
           //no need to wait in the last iteration
           break;
         }
@@ -136,7 +147,11 @@ public class CopyUtils {
           }
 
           // looks like some network outrage, reset the file system object and retry.
-          FileSystem.closeAllForUGI(Utils.getUGI());
+          if (proxyUser == null) {
+            FileSystem.closeAllForUGI(Utils.getUGI());
+          } else {
+            FileSystem.closeAllForUGI(proxyUser);
+          }
           sourceFs = pathList.get(0).getFileSystem(hiveConf);
           destinationFs = destination.getFileSystem(hiveConf);
         }
@@ -155,7 +170,7 @@ public class CopyUtils {
   // If yes, then add to the retry list. If source file missing, then retry with CM path. if CM path
   // itself is missing, then throw error.
   private List<Path> getFilesToRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList,
-                                     FileSystem destinationFs, Path destination, boolean isCopyError)
+                                     Path destination, boolean isCopyError)
           throws IOException, HiveFatalException {
     List<Path> pathList = new ArrayList<Path>();
 
@@ -238,23 +253,54 @@ public class CopyUtils {
     return false;
   }
 
+  private UserGroupInformation getProxyUser() throws LoginException, IOException {
+    if (copyAsUser == null) {
+      return null;
+    }
+    UserGroupInformation proxyUser = null;
+    int currentRetry = 0;
+    while (currentRetry <= MAX_IO_RETRY) {
+      try {
+        UserGroupInformation ugi = Utils.getUGI();
+        String currentUser = ugi.getShortUserName();
+        if (!currentUser.equals(copyAsUser)) {
+          proxyUser = UserGroupInformation.createProxyUser(
+                  copyAsUser, UserGroupInformation.getLoginUser());
+        }
+        return proxyUser;
+      } catch (IOException e) {
+        currentRetry++;
+        if (currentRetry <= MAX_IO_RETRY) {
+          LOG.warn("Unable to get UGI info", e);
+        } else {
+          LOG.error("Unable to get UGI info", e);
+          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+        }
+        int sleepTime = FileUtils.getSleepTime(currentRetry);
+        LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException timerEx) {
+          LOG.info("Sleep interrupted", timerEx.getMessage());
+        }
+      }
+    }
+    return null;
+  }
+
   // Copy without retry
   private void doCopyOnce(FileSystem sourceFs, List<Path> srcList,
-                          FileSystem destinationFs, Path destination,
-                          boolean useRegularCopy) throws IOException, LoginException {
-    UserGroupInformation ugi = Utils.getUGI();
-    String currentUser = ugi.getShortUserName();
-    boolean usePrivilegedUser = copyAsUser != null && !currentUser.equals(copyAsUser);
-
+                          Path destination,
+                          boolean useRegularCopy, UserGroupInformation proxyUser) throws IOException {
     if (useRegularCopy) {
-      doRegularCopyOnce(sourceFs, srcList, destinationFs, destination, usePrivilegedUser);
+      doRegularCopyOnce(sourceFs, srcList, destination, proxyUser);
     } else {
-      doDistCpCopyOnce(sourceFs, srcList, destination, usePrivilegedUser);
+      doDistCpCopyOnce(sourceFs, srcList, destination, proxyUser);
     }
   }
 
   private void doDistCpCopyOnce(FileSystem sourceFs, List<Path> srcList, Path destination,
-      boolean usePrivilegedUser) throws IOException {
+                                UserGroupInformation proxyUser) throws IOException {
     if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
       srcList = srcList.stream().map(path -> {
         URI uri = path.toUri();
@@ -271,7 +317,7 @@ public class CopyUtils {
         srcList,  // list of source paths
         destination,
         false,
-        usePrivilegedUser ? copyAsUser : null,
+        proxyUser,
         hiveConf,
         ShimLoader.getHadoopShims())) {
       LOG.error("Distcp failed to copy files: " + srcList + " to destination: " + destination);
@@ -279,17 +325,15 @@ public class CopyUtils {
     }
   }
 
-  private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList, FileSystem destinationFs,
-      Path destination, boolean usePrivilegedUser) throws IOException {
+  private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList,
+      Path destination, UserGroupInformation proxyUser) throws IOException {
   /*
     even for regular copy we have to use the same user permissions that distCp will use since
     hive-server user might be different that the super user required to copy relevant files.
    */
     final Path[] paths = srcList.toArray(new Path[] {});
-    if (usePrivilegedUser) {
+    if (proxyUser != null) {
       final Path finalDestination = destination;
-      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
-          copyAsUser, UserGroupInformation.getLoginUser());
       try {
         proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> {
           FileUtil
@@ -306,15 +350,21 @@ public class CopyUtils {
 
   public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException {
     Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths);
-    FileSystem destinationFs = destination.getFileSystem(hiveConf);
-
-    for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
-      final FileSystem sourceFs = entry.getKey();
-      List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
-              path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
-      doCopyOnce(sourceFs, entry.getValue(),
-                 destinationFs, destination,
-                 regularCopy(destinationFs, sourceFs, fileList));
+
+    UserGroupInformation proxyUser = getProxyUser();
+    try {
+      for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
+        final FileSystem sourceFs = entry.getKey();
+        List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
+           path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
+        doCopyOnce(sourceFs, entry.getValue(),
+                destination,
+                regularCopy(sourceFs, fileList), proxyUser);
+      }
+    } finally {
+      if (proxyUser != null) {
+        FileSystem.closeAllForUGI(proxyUser);
+      }
     }
   }
 
@@ -325,7 +375,7 @@ public class CopyUtils {
       3. aggregate fileSize of all source Paths(can be directory /  file) is less than configured size.
       4. number of files of all source Paths(can be directory /  file) is less than configured size.
   */
-  boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList)
+  boolean regularCopy(FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList)
       throws IOException {
     if (hiveInTest) {
       return true;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index e8eaae6..fc5419c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -103,7 +103,7 @@ public class FileOperations {
       srcPaths.add(fileStatus.getPath());
     }
 
-    new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths);
+    new CopyUtils(distCpDoAsUser, hiveConf, toPath.getFileSystem(hiveConf)).doCopy(toPath, srcPaths);
   }
 
   private void copyMmPath() throws LoginException, IOException {
@@ -135,7 +135,8 @@ public class FileOperations {
         }
         Utilities.FILE_OP_LOGGER.debug("Exporting originals from {} to {}",
             dirWithOriginals, exportRootDataDir);
-        new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths);
+        new CopyUtils(distCpDoAsUser, hiveConf, exportRootDataDir.getFileSystem(hiveConf)).
+                doCopy(exportRootDataDir, srcPaths);
       }
     }
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
index 7bd660b..610af09 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
@@ -65,7 +65,7 @@ public class TestCopyUtils {
 
     HiveConf conf = Mockito.spy(new HiveConf());
     doReturn(1L).when(conf).getLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, 32L * 1024 * 1024);
-    CopyUtils copyUtils = new CopyUtils("", conf);
+    CopyUtils copyUtils = new CopyUtils("", conf, null);
     long MB_128 = 128 * 1024 * 1024;
     assertFalse(copyUtils.limitReachedForLocalCopy(MB_128, 1L));
   }
@@ -76,7 +76,7 @@ public class TestCopyUtils {
     when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
 
     HiveConf conf = Mockito.spy(new HiveConf());
-    CopyUtils copyUtils = new CopyUtils("", conf);
+    CopyUtils copyUtils = new CopyUtils("", conf, null);
     long MB_16 = 16 * 1024 * 1024;
     assertFalse(copyUtils.limitReachedForLocalCopy(MB_16, 100L));
   }
@@ -88,7 +88,7 @@ public class TestCopyUtils {
     FileSystem fs = mock(FileSystem.class);
     List<Path> srcPaths = Arrays.asList(source, source);
     HiveConf conf = mock(HiveConf.class);
-    CopyUtils copyUtils = Mockito.spy(new CopyUtils(null, conf));
+    CopyUtils copyUtils = Mockito.spy(new CopyUtils(null, conf, fs));
 
     mockStatic(FileUtils.class);
     mockStatic(Utils.class);
@@ -99,7 +99,7 @@ public class TestCopyUtils {
                           same(ShimLoader.getHadoopShims())))
         .thenReturn(false);
     when(Utils.getUGI()).thenReturn(mock(UserGroupInformation.class));
-    doReturn(false).when(copyUtils).regularCopy(same(fs), same(fs), anyListOf(ReplChangeManager.FileInfo.class));
+    doReturn(false).when(copyUtils).regularCopy(same(fs), anyListOf(ReplChangeManager.FileInfo.class));
 
     copyUtils.doCopy(destination, srcPaths);
   }
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index e774419..9a1e590 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -1142,9 +1142,8 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   }
 
   @Override
-  public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException {
-    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
-        doAsUser, UserGroupInformation.getLoginUser());
+  public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf,
+                             UserGroupInformation proxyUser) throws IOException {
     try {
       return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
         @Override
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index c569b24..49a2ab3 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -500,10 +500,11 @@ public interface HadoopShims {
    * @param srcPaths List of Path to the source files or directories to copy
    * @param dst Path to the destination file or directory
    * @param conf The hadoop configuration object
-   * @param doAsUser The user to perform the distcp as
+   * @param proxyUser The user to perform the distcp as
    * @return True if it is successfull; False otherwise.
    */
-  public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException;
+  boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, UserGroupInformation proxyUser)
+          throws IOException;
 
   /**
    * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.