You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/03/21 14:58:10 UTC
[hive] branch master updated: HIVE-21446 : Hive Server going OOM
during hive external table replications. (Mahesh Kumar Behera,
reviewed by Sankar Hariappan)
This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 38682a4 HIVE-21446 : Hive Server going OOM during hive external table replications. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
38682a4 is described below
commit 38682a414708d810937012bae4b0c97deca5ef07
Author: Mahesh Kumar Behera <mb...@hortonworks.com>
AuthorDate: Thu Mar 21 20:27:38 2019 +0530
HIVE-21446 : Hive Server going OOM during hive external table replications. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
---
.../org/apache/hadoop/hive/common/FileUtils.java | 10 +-
.../apache/hadoop/hive/common/TestFileUtils.java | 11 +-
.../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 2 +-
.../ql/exec/repl/ExternalTableCopyTaskBuilder.java | 86 ++++++++----
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 150 ++++++++++++++-------
.../hive/ql/parse/repl/dump/io/FileOperations.java | 5 +-
.../hadoop/hive/ql/parse/repl/TestCopyUtils.java | 8 +-
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 5 +-
.../org/apache/hadoop/hive/shims/HadoopShims.java | 5 +-
9 files changed, 182 insertions(+), 100 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 23a3a6b..8b03faa 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -636,18 +636,18 @@ public final class FileUtils {
}
public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
- boolean deleteSource, String doAsUser,
+ boolean deleteSource, UserGroupInformation proxyUser,
HiveConf conf, HadoopShims shims) throws IOException {
LOG.debug("copying srcPaths : {}, to DestPath :{} ,with doAs: {}",
- StringUtils.join(",", srcPaths), dst.toString(), doAsUser);
+ StringUtils.join(",", srcPaths), dst.toString(), proxyUser);
boolean copied = false;
- if (doAsUser == null){
+ if (proxyUser == null){
copied = shims.runDistCp(srcPaths, dst, conf);
} else {
- copied = shims.runDistCpAs(srcPaths, dst, conf, doAsUser);
+ copied = shims.runDistCpAs(srcPaths, dst, conf, proxyUser);
}
if (copied && deleteSource) {
- if (doAsUser != null) {
+ if (proxyUser != null) {
// if distcp is done using doAsUser, delete also should be done using same user.
//TODO : Need to change the delete execution within doAs if doAsUser is given.
throw new IOException("Distcp is called with doAsUser and delete source set as true");
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
index b45832e..9b5748e 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.Test;
@@ -239,14 +240,16 @@ public class TestFileUtils {
FileSystem fs = copySrc.getFileSystem(conf);
String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+ UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+ doAsUser, UserGroupInformation.getLoginUser());
HadoopShims shims = mock(HadoopShims.class);
- when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser)).thenReturn(true);
+ when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, proxyUser)).thenReturn(true);
when(shims.runDistCp(Collections.singletonList(copySrc), copyDst, conf)).thenReturn(false);
// doAs when asked
- Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, false, doAsUser, conf, shims));
- verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser);
+ Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, false, proxyUser, conf, shims));
+ verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, proxyUser);
// don't doAs when not asked
Assert.assertFalse(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, null, conf, shims));
verify(shims).runDistCp(Collections.singletonList(copySrc), copyDst, conf);
@@ -254,7 +257,7 @@ public class TestFileUtils {
// When distcp is done with doAs, the delete should also be done as doAs. But in current code its broken. This
// should be fixed. For now check is added to avoid wrong usage. So if doAs is set, delete source should be false.
try {
- FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, doAsUser, conf, shims);
+ FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, proxyUser, conf, shims);
Assert.assertTrue("Should throw IOException as doAs is called with delete source set to true".equals(""));
} catch (IOException e) {
Assert.assertTrue(e.getMessage().
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 55a0c1f..c34f075 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -257,7 +257,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
return 2;
}
// Copy the files from different source file systems to one destination directory
- new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(dstFs, toPath, srcFiles);
+ new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles);
// If a file is copied from CM path, then need to rename them using original source file name
// This is needed to avoid having duplicate files in target if same event is applied twice
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
index d7eed2c..6bc3cd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
@@ -90,7 +90,7 @@ public class ExternalTableCopyTaskBuilder {
} catch (FileNotFoundException e) {
// Don't delete target path created else ddl task will try to create it using user hive and may fail.
LOG.warn("source path missing " + sourcePath);
- return false;
+ return createdDir;
}
LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}",
destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission());
@@ -99,53 +99,65 @@ public class ExternalTableCopyTaskBuilder {
return createdDir;
}
- private boolean setTargetPathOwner(Path targetPath, Path sourcePath, String distCpDoAsUser)
- throws IOException {
- if (distCpDoAsUser == null) {
+ private boolean setTargetPathOwner(Path targetPath, Path sourcePath, UserGroupInformation proxyUser)
+ throws IOException, InterruptedException {
+ if (proxyUser == null) {
return createAndSetPathOwner(targetPath, sourcePath);
}
- UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
- distCpDoAsUser, UserGroupInformation.getLoginUser());
- try {
- Path finalTargetPath = targetPath;
- Path finalSourcePath = sourcePath;
- return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
- createAndSetPathOwner(finalTargetPath, finalSourcePath));
- } catch (InterruptedException e) {
- throw new IOException(e);
+ return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
+ createAndSetPathOwner(targetPath, sourcePath));
+ }
+
+ private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser) throws Exception {
+ if (proxyUser == null) {
+ return sourcePath.getFileSystem(conf).exists(sourcePath);
}
+ return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
+ sourcePath.getFileSystem(conf).exists(sourcePath));
}
- private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry) {
+ private int handleException(Exception e, Path sourcePath, Path targetPath,
+ int currentRetry, UserGroupInformation proxyUser) {
try {
- if (!sourcePath.getFileSystem(conf).exists(sourcePath)) {
- LOG.warn("Source path missing " + sourcePath, e);
+ LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e);
+ if (!checkIfPathExist(sourcePath, proxyUser)) {
+ LOG.info("Source path is missing. Ignoring exception.");
return 0;
}
} catch (Exception ex) {
- LOG.warn("Source path missing check failed" + sourcePath, ex);
+ LOG.warn("Source path missing check failed. ", ex);
+ }
+
+ // retry logic only for i/o exception
+ if (!(e instanceof IOException)) {
+ LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
}
if (currentRetry <= MAX_COPY_RETRY) {
- LOG.warn("unable to copy {} to {}", sourcePath, targetPath, e);
+ LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e);
} else {
- LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
+ LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e);
setException(e);
- return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode();
}
int sleepTime = FileUtils.getSleepTime(currentRetry);
- LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
+ LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry));
try {
Thread.sleep(sleepTime);
} catch (InterruptedException timerEx) {
- LOG.info("sleep interrupted", timerEx.getMessage());
+ LOG.info("Sleep interrupted", timerEx.getMessage());
}
try {
- FileSystem.closeAllForUGI(Utils.getUGI());
+ if (proxyUser == null) {
+ proxyUser = Utils.getUGI();
+ }
+ FileSystem.closeAllForUGI(proxyUser);
} catch (Exception ex) {
- LOG.error("unable to closeAllForUGI", ex);
+ LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex);
}
return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
}
@@ -162,14 +174,17 @@ public class ExternalTableCopyTaskBuilder {
}
int currentRetry = 0;
int error = 0;
+ UserGroupInformation proxyUser = null;
while (currentRetry <= MAX_COPY_RETRY) {
try {
UserGroupInformation ugi = Utils.getUGI();
String currentUser = ugi.getShortUserName();
- boolean usePrivilegedUser =
- distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser);
+ if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
+ proxyUser = UserGroupInformation.createProxyUser(
+ distCpDoAsUser, UserGroupInformation.getLoginUser());
+ }
- setTargetPathOwner(targetPath, sourcePath, usePrivilegedUser ? distCpDoAsUser : null);
+ setTargetPathOwner(targetPath, sourcePath, proxyUser);
// do we create a new conf and only here provide this additional option so that we get away from
// differences of data in two location for the same directories ?
@@ -179,16 +194,29 @@ public class ExternalTableCopyTaskBuilder {
Collections.singletonList(sourcePath), // list of source paths
targetPath,
false,
- usePrivilegedUser ? distCpDoAsUser : null,
+ proxyUser,
conf,
ShimLoader.getHadoopShims());
return 0;
} catch (Exception e) {
currentRetry++;
- error = handleException(e, sourcePath, targetPath, currentRetry);
+ error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser);
if (error == 0) {
return 0;
}
+ } finally {
+ if (proxyUser != null) {
+ try {
+ FileSystem.closeAllForUGI(proxyUser);
+ } catch (IOException e) {
+ LOG.error("Unable to closeAllForUGI for user " + proxyUser, e);
+ if (error == 0) {
+ setException(e);
+ error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+ break;
+ }
+ }
}
}
return error;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 686fe7b..73c863e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -50,59 +50,70 @@ public class CopyUtils {
private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class);
// https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
public static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
- private static final int MAX_COPY_RETRY = 5;
+ private static final int MAX_IO_RETRY = 5;
private final HiveConf hiveConf;
private final long maxCopyFileSize;
private final long maxNumberOfFiles;
private final boolean hiveInTest;
private final String copyAsUser;
+ private FileSystem destinationFs;
- public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
+ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) {
this.hiveConf = hiveConf;
maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES);
maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE);
hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);
this.copyAsUser = distCpDoAsUser;
+ this.destinationFs = destinationFs;
}
// Used by replication, copy files from source to destination. It is possible source file is
// changed/removed during copy, so double check the checksum after copy,
// if not match, copy again from cm
- public void copyAndVerify(FileSystem destinationFs, Path destRoot,
+ public void copyAndVerify(Path destRoot,
List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException, HiveFatalException {
Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot);
- for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
- FileSystem sourceFs = entry.getKey();
- Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue();
- for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) {
- Path destination = destMapEntry.getKey();
- List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
- boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList);
-
- if (!destinationFs.exists(destination)
- && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
- LOG.error("Failed to create destination directory: " + destination);
- throw new IOException("Destination directory creation failed");
- }
+ UserGroupInformation proxyUser = getProxyUser();
+ try {
+ for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
+ Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue();
+ for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) {
+ Path destination = destMapEntry.getKey();
+ List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
+ // Get the file system again from cache. There is a chance that the file system stored in the map is closed.
+ // For instance, doCopyRetry closes the file system in case of i/o exceptions.
+ FileSystem sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf);
+ boolean useRegularCopy = regularCopy(sourceFs, fileInfoList);
+
+ if (!destinationFs.exists(destination)
+ && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
+ LOG.error("Failed to create destination directory: " + destination);
+ throw new IOException("Destination directory creation failed");
+ }
- // Copy files with retry logic on failure or source file is dropped or changed.
- doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy);
+ // Copy files with retry logic on failure or source file is dropped or changed.
+ doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy);
+ }
+ }
+ } finally {
+ if (proxyUser != null) {
+ FileSystem.closeAllForUGI(proxyUser);
}
}
}
private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList,
- FileSystem destinationFs, Path destination,
+ Path destination, UserGroupInformation proxyUser,
boolean useRegularCopy) throws IOException, LoginException, HiveFatalException {
int repeat = 0;
boolean isCopyError = false;
List<Path> pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath);
- while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) {
+ while (!pathList.isEmpty() && (repeat < MAX_IO_RETRY)) {
try {
// if its retrying, first regenerate the path list.
if (repeat > 0) {
- pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, isCopyError);
+ pathList = getFilesToRetry(sourceFs, srcFileList, destination, isCopyError);
if (pathList.isEmpty()) {
// all files were copied successfully in last try. So can break from here.
break;
@@ -113,7 +124,7 @@ public class CopyUtils {
// if exception happens during doCopyOnce, then need to call getFilesToRetry with copy error as true in retry.
isCopyError = true;
- doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy);
+ doCopyOnce(sourceFs, pathList, destination, useRegularCopy, proxyUser);
// if exception happens after doCopyOnce, then need to call getFilesToRetry with copy error as false in retry.
isCopyError = false;
@@ -121,7 +132,7 @@ public class CopyUtils {
// If copy fails, fall through the retry logic
LOG.info("file operation failed", e);
- if (repeat >= (MAX_COPY_RETRY - 1)) {
+ if (repeat >= (MAX_IO_RETRY - 1)) {
//no need to wait in the last iteration
break;
}
@@ -136,7 +147,11 @@ public class CopyUtils {
}
// looks like some network outrage, reset the file system object and retry.
- FileSystem.closeAllForUGI(Utils.getUGI());
+ if (proxyUser == null) {
+ FileSystem.closeAllForUGI(Utils.getUGI());
+ } else {
+ FileSystem.closeAllForUGI(proxyUser);
+ }
sourceFs = pathList.get(0).getFileSystem(hiveConf);
destinationFs = destination.getFileSystem(hiveConf);
}
@@ -155,7 +170,7 @@ public class CopyUtils {
// If yes, then add to the retry list. If source file missing, then retry with CM path. if CM path
// itself is missing, then throw error.
private List<Path> getFilesToRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList,
- FileSystem destinationFs, Path destination, boolean isCopyError)
+ Path destination, boolean isCopyError)
throws IOException, HiveFatalException {
List<Path> pathList = new ArrayList<Path>();
@@ -238,23 +253,54 @@ public class CopyUtils {
return false;
}
+ private UserGroupInformation getProxyUser() throws LoginException, IOException {
+ if (copyAsUser == null) {
+ return null;
+ }
+ UserGroupInformation proxyUser = null;
+ int currentRetry = 0;
+ while (currentRetry <= MAX_IO_RETRY) {
+ try {
+ UserGroupInformation ugi = Utils.getUGI();
+ String currentUser = ugi.getShortUserName();
+ if (!currentUser.equals(copyAsUser)) {
+ proxyUser = UserGroupInformation.createProxyUser(
+ copyAsUser, UserGroupInformation.getLoginUser());
+ }
+ return proxyUser;
+ } catch (IOException e) {
+ currentRetry++;
+ if (currentRetry <= MAX_IO_RETRY) {
+ LOG.warn("Unable to get UGI info", e);
+ } else {
+ LOG.error("Unable to get UGI info", e);
+ throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
+ }
+ int sleepTime = FileUtils.getSleepTime(currentRetry);
+ LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException timerEx) {
+ LOG.info("Sleep interrupted", timerEx.getMessage());
+ }
+ }
+ }
+ return null;
+ }
+
// Copy without retry
private void doCopyOnce(FileSystem sourceFs, List<Path> srcList,
- FileSystem destinationFs, Path destination,
- boolean useRegularCopy) throws IOException, LoginException {
- UserGroupInformation ugi = Utils.getUGI();
- String currentUser = ugi.getShortUserName();
- boolean usePrivilegedUser = copyAsUser != null && !currentUser.equals(copyAsUser);
-
+ Path destination,
+ boolean useRegularCopy, UserGroupInformation proxyUser) throws IOException {
if (useRegularCopy) {
- doRegularCopyOnce(sourceFs, srcList, destinationFs, destination, usePrivilegedUser);
+ doRegularCopyOnce(sourceFs, srcList, destination, proxyUser);
} else {
- doDistCpCopyOnce(sourceFs, srcList, destination, usePrivilegedUser);
+ doDistCpCopyOnce(sourceFs, srcList, destination, proxyUser);
}
}
private void doDistCpCopyOnce(FileSystem sourceFs, List<Path> srcList, Path destination,
- boolean usePrivilegedUser) throws IOException {
+ UserGroupInformation proxyUser) throws IOException {
if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
srcList = srcList.stream().map(path -> {
URI uri = path.toUri();
@@ -271,7 +317,7 @@ public class CopyUtils {
srcList, // list of source paths
destination,
false,
- usePrivilegedUser ? copyAsUser : null,
+ proxyUser,
hiveConf,
ShimLoader.getHadoopShims())) {
LOG.error("Distcp failed to copy files: " + srcList + " to destination: " + destination);
@@ -279,17 +325,15 @@ public class CopyUtils {
}
}
- private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList, FileSystem destinationFs,
- Path destination, boolean usePrivilegedUser) throws IOException {
+ private void doRegularCopyOnce(FileSystem sourceFs, List<Path> srcList,
+ Path destination, UserGroupInformation proxyUser) throws IOException {
/*
even for regular copy we have to use the same user permissions that distCp will use since
hive-server user might be different that the super user required to copy relevant files.
*/
final Path[] paths = srcList.toArray(new Path[] {});
- if (usePrivilegedUser) {
+ if (proxyUser != null) {
final Path finalDestination = destination;
- UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
- copyAsUser, UserGroupInformation.getLoginUser());
try {
proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> {
FileUtil
@@ -306,15 +350,21 @@ public class CopyUtils {
public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException {
Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths);
- FileSystem destinationFs = destination.getFileSystem(hiveConf);
-
- for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
- final FileSystem sourceFs = entry.getKey();
- List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
- path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
- doCopyOnce(sourceFs, entry.getValue(),
- destinationFs, destination,
- regularCopy(destinationFs, sourceFs, fileList));
+
+ UserGroupInformation proxyUser = getProxyUser();
+ try {
+ for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
+ final FileSystem sourceFs = entry.getKey();
+ List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
+ path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
+ doCopyOnce(sourceFs, entry.getValue(),
+ destination,
+ regularCopy(sourceFs, fileList), proxyUser);
+ }
+ } finally {
+ if (proxyUser != null) {
+ FileSystem.closeAllForUGI(proxyUser);
+ }
}
}
@@ -325,7 +375,7 @@ public class CopyUtils {
3. aggregate fileSize of all source Paths(can be directory / file) is less than configured size.
4. number of files of all source Paths(can be directory / file) is less than configured size.
*/
- boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList)
+ boolean regularCopy(FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList)
throws IOException {
if (hiveInTest) {
return true;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index e8eaae6..fc5419c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -103,7 +103,7 @@ public class FileOperations {
srcPaths.add(fileStatus.getPath());
}
- new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths);
+ new CopyUtils(distCpDoAsUser, hiveConf, toPath.getFileSystem(hiveConf)).doCopy(toPath, srcPaths);
}
private void copyMmPath() throws LoginException, IOException {
@@ -135,7 +135,8 @@ public class FileOperations {
}
Utilities.FILE_OP_LOGGER.debug("Exporting originals from {} to {}",
dirWithOriginals, exportRootDataDir);
- new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths);
+ new CopyUtils(distCpDoAsUser, hiveConf, exportRootDataDir.getFileSystem(hiveConf)).
+ doCopy(exportRootDataDir, srcPaths);
}
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
index 7bd660b..610af09 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
@@ -65,7 +65,7 @@ public class TestCopyUtils {
HiveConf conf = Mockito.spy(new HiveConf());
doReturn(1L).when(conf).getLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, 32L * 1024 * 1024);
- CopyUtils copyUtils = new CopyUtils("", conf);
+ CopyUtils copyUtils = new CopyUtils("", conf, null);
long MB_128 = 128 * 1024 * 1024;
assertFalse(copyUtils.limitReachedForLocalCopy(MB_128, 1L));
}
@@ -76,7 +76,7 @@ public class TestCopyUtils {
when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
HiveConf conf = Mockito.spy(new HiveConf());
- CopyUtils copyUtils = new CopyUtils("", conf);
+ CopyUtils copyUtils = new CopyUtils("", conf, null);
long MB_16 = 16 * 1024 * 1024;
assertFalse(copyUtils.limitReachedForLocalCopy(MB_16, 100L));
}
@@ -88,7 +88,7 @@ public class TestCopyUtils {
FileSystem fs = mock(FileSystem.class);
List<Path> srcPaths = Arrays.asList(source, source);
HiveConf conf = mock(HiveConf.class);
- CopyUtils copyUtils = Mockito.spy(new CopyUtils(null, conf));
+ CopyUtils copyUtils = Mockito.spy(new CopyUtils(null, conf, fs));
mockStatic(FileUtils.class);
mockStatic(Utils.class);
@@ -99,7 +99,7 @@ public class TestCopyUtils {
same(ShimLoader.getHadoopShims())))
.thenReturn(false);
when(Utils.getUGI()).thenReturn(mock(UserGroupInformation.class));
- doReturn(false).when(copyUtils).regularCopy(same(fs), same(fs), anyListOf(ReplChangeManager.FileInfo.class));
+ doReturn(false).when(copyUtils).regularCopy(same(fs), anyListOf(ReplChangeManager.FileInfo.class));
copyUtils.doCopy(destination, srcPaths);
}
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index e774419..9a1e590 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -1142,9 +1142,8 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
@Override
- public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException {
- UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
- doAsUser, UserGroupInformation.getLoginUser());
+ public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf,
+ UserGroupInformation proxyUser) throws IOException {
try {
return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index c569b24..49a2ab3 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -500,10 +500,11 @@ public interface HadoopShims {
* @param srcPaths List of Path to the source files or directories to copy
* @param dst Path to the destination file or directory
* @param conf The hadoop configuration object
- * @param doAsUser The user to perform the distcp as
+ * @param proxyUser The user to perform the distcp as
* @return True if it is successfull; False otherwise.
*/
- public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException;
+ boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, UserGroupInformation proxyUser)
+ throws IOException;
/**
* Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.