You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by aa...@apache.org on 2021/09/23 14:58:48 UTC
[hive] branch master updated: HIVE-25330: Make FS calls in
CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena)
This is an automated email from the ASF dual-hosted git repository.
aasha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6aa812a HIVE-25330: Make FS calls in CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena)
6aa812a is described below
commit 6aa812a88163a204f63cb84f5f581a533d2f54a0
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Thu Sep 23 20:28:32 2021 +0530
HIVE-25330: Make FS calls in CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena)
* HIVE-25330: Make FS calls in CopyUtils retryable
* NPE Corrected
* Retries fail on parent exceptions.
* Changed to HashSet
---
.../apache/hadoop/hive/ql/exec/util/Retryable.java | 45 ++++--
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 159 +++++++++++++--------
.../hadoop/hive/ql/parse/repl/TestCopyUtils.java | 100 ++++++++++++-
3 files changed, 227 insertions(+), 77 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
index a31b96b..01b486d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.security.UserGroupInformation;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -36,16 +37,18 @@ public class Retryable {
private static final long MINIMUM_DELAY_IN_SEC = 60;
private long totalDurationInSeconds;
- private List<Class<? extends Exception>> retryOn;
- private List<Class<? extends Exception>> failOn;
+ private Set<Class<? extends Exception>> retryOn;
+ private Set<Class<? extends Exception>> failOn;
+ private Set<Class<? extends Exception>> failOnParentExceptions;
private long initialDelayInSeconds;
private long maxRetryDelayInSeconds;
private double backOff;
private int maxJitterInSeconds;
private Retryable() {
- this.retryOn = new ArrayList<>();
- this.failOn = new ArrayList<>();
+ this.retryOn = new HashSet<>();
+ this.failOn = new HashSet<>();
+ this.failOnParentExceptions = new HashSet<>();
this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);
this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal,
@@ -74,7 +77,8 @@ public class Retryable {
return callable.call();
}
} catch (Exception e) {
- if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
+ if (this.failOnParentExceptions.stream().noneMatch(k -> k.isAssignableFrom(e.getClass()))
+ && this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
&& this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) {
if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
// case where waiting would go beyond max duration. So throw exception and return
@@ -149,8 +153,7 @@ public class Retryable {
// making this thread safe as it appends to list
public synchronized Builder withRetryOnException(final Class<? extends Exception> exceptionClass) {
- if (exceptionClass != null &&
- runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+ if (exceptionClass != null) {
runnable.retryOn.add(exceptionClass);
}
return this;
@@ -158,17 +161,32 @@ public class Retryable {
public synchronized Builder withRetryOnExceptionList(final List<Class<? extends Exception>> exceptionClassList) {
for (final Class<? extends Exception> exceptionClass : exceptionClassList) {
- if (exceptionClass != null &&
- runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+ if (exceptionClass != null) {
runnable.retryOn.add(exceptionClass);
}
}
return this;
}
+ public synchronized Builder withFailOnParentException(final Class<? extends Exception> exceptionClass) {
+ if (exceptionClass != null) {
+ runnable.failOnParentExceptions.add(exceptionClass);
+ }
+ return this;
+ }
+
+ public synchronized Builder withFailOnParentExceptionList(final List<Class<?
+ extends Exception>> exceptionClassList) {
+ for (final Class<? extends Exception> exceptionClass : exceptionClassList) {
+ if (exceptionClass != null) {
+ runnable.failOnParentExceptions.add(exceptionClass);
+ }
+ }
+ return this;
+ }
+
public synchronized Builder withFailOnException(final Class<? extends Exception> exceptionClass) {
- if (exceptionClass != null &&
- runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+ if (exceptionClass != null) {
runnable.failOn.add(exceptionClass);
}
return this;
@@ -177,8 +195,7 @@ public class Retryable {
public synchronized Builder withFailOnExceptionList(final List<Class<?
extends Exception>> exceptionClassList) {
for (final Class<? extends Exception> exceptionClass : exceptionClassList) {
- if (exceptionClass != null &&
- runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+ if (exceptionClass != null) {
runnable.failOn.add(exceptionClass);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index db68250..2f9c071 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -66,6 +67,16 @@ public class CopyUtils {
private FileSystem destinationFs;
private final int maxParallelCopyTask;
+ private List<Class<? extends Exception>> failOnParentExceptionList = Arrays.asList(org.apache.hadoop.fs.PathIOException.class,
+ org.apache.hadoop.fs.UnsupportedFileSystemException.class,
+ org.apache.hadoop.fs.InvalidPathException.class,
+ org.apache.hadoop.fs.InvalidRequestException.class,
+ org.apache.hadoop.fs.FileAlreadyExistsException.class,
+ org.apache.hadoop.fs.ChecksumException.class,
+ org.apache.hadoop.fs.ParentNotDirectoryException.class,
+ org.apache.hadoop.hdfs.protocol.QuotaExceededException.class,
+ FileNotFoundException.class);
+
public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) {
this.hiveConf = hiveConf;
maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES);
@@ -76,6 +87,57 @@ public class CopyUtils {
this.destinationFs = destinationFs;
}
+ private <T> T retryableFxn(Callable<T> callable) throws IOException {
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(hiveConf)
+ .withRetryOnException(IOException.class).withFailOnParentExceptionList(failOnParentExceptionList).build();
+ try {
+ return retryable.executeCallable(() -> callable.call());
+ } catch (Exception e) {
+ if (failOnParentExceptionList.stream().anyMatch(k -> k.isAssignableFrom(e.getClass()))) {
+ throw new IOException(e);
+ }
+ throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg(), e);
+ }
+ }
+
+ @VisibleForTesting
+ String checkSumFor(Path srcFile, FileSystem fs) throws IOException {
+ return retryableFxn(() -> ReplChangeManager.checksumFor(srcFile, fs));
+ }
+
+ @VisibleForTesting
+ void copyFilesBetweenFS(FileSystem sourceFs, Path[] paths, FileSystem destinationFs,
+ Path finalDestination, boolean deleteSource, boolean overwrite) throws IOException {
+ retryableFxn(() -> FileUtil
+ .copy(sourceFs, paths, destinationFs, finalDestination, deleteSource, overwrite, hiveConf));
+ }
+
+ @VisibleForTesting
+ boolean exists(FileSystem fs, Path path) throws IOException {
+ return retryableFxn(() -> fs.exists(path));
+ }
+
+ @VisibleForTesting
+ boolean delete(FileSystem fs, Path path, boolean recursive) throws IOException {
+ return retryableFxn(() -> fs.delete(path, recursive));
+ }
+
+ @VisibleForTesting
+ boolean mkdirs(FileSystem fs, Path path) throws IOException {
+ return retryableFxn(() -> fs.mkdirs(path));
+ }
+
+ @VisibleForTesting
+ boolean rename(FileSystem fs, Path srcPath, Path dstPath) throws IOException {
+ return retryableFxn(() -> fs.rename(srcPath, dstPath));
+ }
+
+ @VisibleForTesting
+ ContentSummary getContentSummary(FileSystem fs, Path f) throws IOException {
+ return retryableFxn(() -> fs.getContentSummary(f));
+ }
+
// Used by replication, copy files from source to destination. It is possible source file is
// changed/removed during copy, so double check the checksum after copy,
// if not match, copy again from cm
@@ -131,9 +193,6 @@ public class CopyUtils {
if (executorService != null) {
executorService.shutdown();
}
- if (proxyUser != null) {
- FileSystem.closeAllForUGI(proxyUser);
- }
}
}
@@ -144,15 +203,13 @@ public class CopyUtils {
@VisibleForTesting
void doCopy(Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry, UserGroupInformation proxyUser,
- boolean useRegularCopy, boolean overwrite) throws IOException, LoginException,
- HiveFatalException {
+ boolean useRegularCopy, boolean overwrite) throws IOException, LoginException, HiveFatalException {
Path destination = destMapEntry.getKey();
List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
// Get the file system again from cache. There is a chance that the file system stored in the map is closed.
// For instance, doCopyRetry closes the file system in case of i/o exceptions.
FileSystem sourceFsOfFileInfo = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf);
- if (!destinationFs.exists(destination)
- && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
+ if (!exists(destinationFs, destination) && !mkdirs(destinationFs, destination)) {
LOG.error("Failed to create destination directory: " + destination);
throw new IOException("Destination directory creation failed");
}
@@ -190,11 +247,14 @@ public class CopyUtils {
// If copy fails, fall through the retry logic
LOG.info("file operation failed", e);
- if (repeat >= (MAX_IO_RETRY - 1)) {
- //no need to wait in the last iteration
+ //Don't retry in the following cases:
+ //1. This is last attempt of retry.
+ //2. Execution already hit the exception which should not be retried.
+ //3. Retry is already exhausted by FS operations.
+ if (repeat >= (MAX_IO_RETRY - 1) || failOnParentExceptionList.stream().anyMatch(k -> k.isAssignableFrom(e.getClass()))
+ || ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg().equals(e.getMessage())) {
break;
}
-
if (!(e instanceof FileNotFoundException)) {
int sleepTime = FileUtils.getSleepTime(repeat);
LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (repeat+1));
@@ -205,11 +265,6 @@ public class CopyUtils {
}
// looks like some network outrage, reset the file system object and retry.
- if (proxyUser == null) {
- FileSystem.closeAllForUGI(Utils.getUGI());
- } else {
- FileSystem.closeAllForUGI(proxyUser);
- }
sourceFs = pathList.get(0).getFileSystem(hiveConf);
destinationFs = destination.getFileSystem(hiveConf);
}
@@ -240,11 +295,11 @@ public class CopyUtils {
}
Path srcPath = srcFile.getEffectivePath();
//Path destPath = new Path(destination, srcPath.getName());
- if (destinationFs.exists(destination)) {
+ if (exists(destinationFs, destination)) {
// If destination file is present and checksum of source mismatch, then retry copy.
if (isSourceFileMismatch(sourceFs, srcFile)) {
// Delete the incorrectly copied file and retry with CM path
- destinationFs.delete(destination, true);
+ delete(destinationFs, destination, true);
srcFile.setIsUseSourcePath(false);
} else {
// If the retry logic is reached after copy error, then include the copied file as well.
@@ -270,7 +325,7 @@ public class CopyUtils {
throw new HiveFatalException(ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getMsg());
}
- if (!srcFile.isUseSourcePath() && !sourceFs.exists(srcFile.getCmPath())) {
+ if (!srcFile.isUseSourcePath() && !exists(sourceFs, srcFile.getCmPath())) {
// CM path itself is missing, cannot recover from this error
LOG.error("File Copy Failed. Both source and CM files are missing from source. "
+ "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". "
@@ -297,7 +352,7 @@ public class CopyUtils {
String destFileName = srcFile.getCmPath().getName();
Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
Path destFile = new Path(destRoot, destFileName);
- if (dstFs.exists(destFile)) {
+ if (exists(dstFs, destFile)) {
String destFileWithSourceName = srcFile.getSourcePath().getName();
Path newDestFile = new Path(destRoot, destFileWithSourceName);
@@ -305,13 +360,13 @@ public class CopyUtils {
// directly to table path (bypassing staging directory) then there might be some stale files from previous
// incomplete/failed load. No need of recycle as this is a case of stale file.
try {
- dstFs.delete(newDestFile, true);
+ delete(dstFs, newDestFile, true);
LOG.debug(" file " + newDestFile + " is deleted before renaming");
} catch (FileNotFoundException e) {
// no problem
}
- boolean result = dstFs.rename(destFile, newDestFile);
+ boolean result = rename(dstFs, destFile, newDestFile);
if (!result) {
throw new IllegalStateException(
"could not rename " + destFile.getName() + " to " + newDestFile.getName());
@@ -328,12 +383,11 @@ public class CopyUtils {
if (sourceChecksumString != null) {
String verifySourceChecksumString;
try {
- verifySourceChecksumString
- = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs);
+ verifySourceChecksumString = checkSumFor(srcFile.getSourcePath(), sourceFs);
} catch (IOException e) {
LOG.info("Unable to calculate checksum for source file: " + srcFile.getSourcePath(), e);
- if (!sourceFs.exists(srcFile.getSourcePath())) {
+ if (!exists(sourceFs, srcFile.getSourcePath())) {
// if source file is missing, then return true, so that cm path will be used for copy.
return true;
}
@@ -351,23 +405,13 @@ public class CopyUtils {
if (copyAsUser == null) {
return null;
}
- Retryable retryable = Retryable.builder()
- .withHiveConf(hiveConf)
- .withRetryOnException(IOException.class).build();
- try {
- return retryable.executeCallable(() -> {
- UserGroupInformation proxyUser = null;
- UserGroupInformation ugi = Utils.getUGI();
- String currentUser = ugi.getShortUserName();
- if (!currentUser.equals(copyAsUser)) {
- proxyUser = UserGroupInformation.createProxyUser(
- copyAsUser, UserGroupInformation.getLoginUser());
- }
- return proxyUser;
- });
- } catch (Exception e) {
- throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
- }
+ return retryableFxn(() -> {
+ String currentUser = Utils.getUGI().getShortUserName();
+ if (!currentUser.equals(copyAsUser)) {
+ return UserGroupInformation.createProxyUser(copyAsUser, UserGroupInformation.getLoginUser());
+ }
+ return null;
+ });
}
// Copy without retry
@@ -423,8 +467,7 @@ public class CopyUtils {
if (overWrite) {
deleteSubDirs(destinationFs, destination);
}
- FileUtil
- .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf);
+ copyFilesBetweenFS(sourceFs, paths, destinationFs, finalDestination, false, true);
return true;
});
} catch (InterruptedException e) {
@@ -435,35 +478,29 @@ public class CopyUtils {
if (overWrite) {
deleteSubDirs(destinationFs, destination);
}
- FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf);
+ copyFilesBetweenFS(sourceFs, paths, destinationFs, destination, false, true);
}
}
private void deleteSubDirs(FileSystem fs, Path path) throws IOException {
//Delete the root path instead of doing a listing
//This is more optimised
- fs.delete(path, true);
+ delete(fs, path, true);
//Recreate just the Root folder
- fs.mkdirs(path);
+ mkdirs(fs, path);
}
public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException {
Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths);
UserGroupInformation proxyUser = getProxyUser();
- try {
- for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
- final FileSystem sourceFs = entry.getKey();
- List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
- path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
- doCopyOnce(sourceFs, entry.getValue(),
- destination,
- regularCopy(sourceFs, fileList), proxyUser, false);
- }
- } finally {
- if (proxyUser != null) {
- FileSystem.closeAllForUGI(proxyUser);
- }
+ for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
+ final FileSystem sourceFs = entry.getKey();
+ List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
+ path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
+ doCopyOnce(sourceFs, entry.getValue(),
+ destination,
+ regularCopy(sourceFs, fileList), proxyUser, false);
}
}
@@ -492,11 +529,11 @@ public class CopyUtils {
for (ReplChangeManager.FileInfo fileInfo : fileList) {
ContentSummary contentSummary = null;
try {
- contentSummary = sourceFs.getContentSummary(fileInfo.getEffectivePath());
+ contentSummary = getContentSummary(sourceFs, fileInfo.getEffectivePath());
} catch (IOException e) {
// In replication, if source file does not exist, try cmroot
if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) {
- contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath());
+ contentSummary = getContentSummary(sourceFs, fileInfo.getCmPath());
fileInfo.setIsUseSourcePath(false);
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
index 94993bb..4740802 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
@@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyListOf;
import static org.mockito.ArgumentMatchers.eq;
@@ -57,7 +59,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
* Unit Test class for CopyUtils class.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ CopyUtils.class, FileUtils.class, Utils.class, UserGroupInformation.class})
+@PrepareForTest({ CopyUtils.class, FileUtils.class, Utils.class, UserGroupInformation.class, ReplChangeManager.class})
@PowerMockIgnore({ "javax.management.*" })
public class TestCopyUtils {
/*
@@ -111,6 +113,100 @@ public class TestCopyUtils {
}
@Test
+ public void testFSCallsFailOnParentExceptions() throws Exception {
+ mockStatic(UserGroupInformation.class);
+ mockStatic(ReplChangeManager.class);
+ when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
+ HiveConf conf = mock(HiveConf.class);
+ conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s");
+ FileSystem fs = mock(FileSystem.class);
+ Path source = mock(Path.class);
+ Path destination = mock(Path.class);
+ ContentSummary cs = mock(ContentSummary.class);
+
+ Exception exception = new org.apache.hadoop.fs.PathPermissionException("Failed");
+ when(ReplChangeManager.checksumFor(source, fs)).thenThrow(exception).thenReturn("dummy");
+ when(fs.exists(same(source))).thenThrow(exception).thenReturn(true);
+ when(fs.delete(same(source), anyBoolean())).thenThrow(exception).thenReturn(true);
+ when(fs.mkdirs(same(source))).thenThrow(exception).thenReturn(true);
+ when(fs.rename(same(source), same(destination))).thenThrow(exception).thenReturn(true);
+ when(fs.getContentSummary(same(source))).thenThrow(exception).thenReturn(cs);
+
+ CopyUtils copyUtils = new CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, fs);
+ CopyUtils copyUtilsSpy = Mockito.spy(copyUtils);
+ try {
+ copyUtilsSpy.exists(fs, source);
+ } catch (Exception e) {
+ assertEquals(exception.getClass(), e.getCause().getClass());
+ }
+ Mockito.verify(fs, Mockito.times(1)).exists(source);
+ try {
+ copyUtils.delete(fs, source, true);
+ } catch (Exception e) {
+ assertEquals(exception.getClass(), e.getCause().getClass());
+ }
+ Mockito.verify(fs, Mockito.times(1)).delete(source, true);
+ try {
+ copyUtils.mkdirs(fs, source);
+ } catch (Exception e) {
+ assertEquals(exception.getClass(), e.getCause().getClass());
+ }
+ Mockito.verify(fs, Mockito.times(1)).mkdirs(source);
+ try {
+ copyUtils.rename(fs, source, destination);
+ } catch (Exception e) {
+ assertEquals(exception.getClass(), e.getCause().getClass());
+ }
+ Mockito.verify(fs, Mockito.times(1)).rename(source, destination);
+ try {
+ copyUtilsSpy.getContentSummary(fs, source);
+ } catch (Exception e) {
+ assertEquals(exception.getClass(), e.getCause().getClass());;
+ }
+ Mockito.verify(fs, Mockito.times(1)).getContentSummary(source);
+ try {
+ copyUtilsSpy.checkSumFor(source, fs);
+ } catch (Exception e) {
+ assertEquals(exception.getClass(), e.getCause().getClass());
+ }
+ Mockito.verify(copyUtilsSpy, Mockito.times(1)).checkSumFor(source, fs);
+ }
+
+ @Test
+ public void testRetryableFSCalls() throws Exception {
+ mockStatic(UserGroupInformation.class);
+ mockStatic(ReplChangeManager.class);
+ when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
+ HiveConf conf = mock(HiveConf.class);
+ conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s");
+ FileSystem fs = mock(FileSystem.class);
+ Path source = mock(Path.class);
+ Path destination = mock(Path.class);
+ ContentSummary cs = mock(ContentSummary.class);
+
+ when(ReplChangeManager.checksumFor(source, fs)).thenThrow(new IOException("Failed")).thenReturn("dummy");
+ when(fs.exists(same(source))).thenThrow(new IOException("Failed")).thenReturn(true);
+ when(fs.delete(same(source), anyBoolean())).thenThrow(new IOException("Failed")).thenReturn(true);
+ when(fs.mkdirs(same(source))).thenThrow(new IOException("Failed")).thenReturn(true);
+ when(fs.rename(same(source), same(destination))).thenThrow(new IOException("Failed")).thenReturn(true);
+ when(fs.getContentSummary(same(source))).thenThrow(new IOException("Failed")).thenReturn(cs);
+
+ CopyUtils copyUtils = new CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, fs);
+ CopyUtils copyUtilsSpy = Mockito.spy(copyUtils);
+ assertEquals (true, copyUtilsSpy.exists(fs, source));
+ Mockito.verify(fs, Mockito.times(2)).exists(source);
+ assertEquals (true, copyUtils.delete(fs, source, true));
+ Mockito.verify(fs, Mockito.times(2)).delete(source, true);
+ assertEquals (true, copyUtils.mkdirs(fs, source));
+ Mockito.verify(fs, Mockito.times(2)).mkdirs(source);
+ assertEquals (true, copyUtils.rename(fs, source, destination));
+ Mockito.verify(fs, Mockito.times(2)).rename(source, destination);
+ assertEquals (cs, copyUtilsSpy.getContentSummary(fs, source));
+ Mockito.verify(fs, Mockito.times(2)).getContentSummary(source);
+ assertEquals ("dummy", copyUtilsSpy.checkSumFor(source, fs));
+ }
+
+ @Test
public void testParallelCopySuccess() throws Exception {
mockStatic(UserGroupInformation.class);
when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
@@ -148,4 +244,4 @@ public class TestCopyUtils {
Mockito.verify(mockExecutorService,
Mockito.times(1)).invokeAll(callableCapture.capture());
}
-}
\ No newline at end of file
+}