You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by aa...@apache.org on 2021/09/23 14:58:48 UTC

[hive] branch master updated: HIVE-25330: Make FS calls in CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena)

This is an automated email from the ASF dual-hosted git repository.

aasha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aa812a  HIVE-25330: Make FS calls in CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena)
6aa812a is described below

commit 6aa812a88163a204f63cb84f5f581a533d2f54a0
Author: Haymant Mangla <79...@users.noreply.github.com>
AuthorDate: Thu Sep 23 20:28:32 2021 +0530

    HIVE-25330: Make FS calls in CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena)
    
    * HIVE-25330: Make FS calls in CopyUtils retryable
    
    * NPE Corrected
    
    * Retries fail on parent exceptions.
    
    * Changed to HashSet
---
 .../apache/hadoop/hive/ql/exec/util/Retryable.java |  45 ++++--
 .../hadoop/hive/ql/parse/repl/CopyUtils.java       | 159 +++++++++++++--------
 .../hadoop/hive/ql/parse/repl/TestCopyUtils.java   | 100 ++++++++++++-
 3 files changed, 227 insertions(+), 77 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
index a31b96b..01b486d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -36,16 +37,18 @@ public class Retryable {
   private static final long MINIMUM_DELAY_IN_SEC = 60;
 
   private long totalDurationInSeconds;
-  private List<Class<? extends Exception>> retryOn;
-  private List<Class<? extends Exception>> failOn;
+  private Set<Class<? extends Exception>> retryOn;
+  private Set<Class<? extends Exception>> failOn;
+  private Set<Class<? extends Exception>> failOnParentExceptions;
   private long initialDelayInSeconds;
   private long maxRetryDelayInSeconds;
   private double backOff;
   private int maxJitterInSeconds;
 
   private Retryable() {
-    this.retryOn = new ArrayList<>();
-    this.failOn = new ArrayList<>();
+    this.retryOn = new HashSet<>();
+    this.failOn = new HashSet<>();
+    this.failOnParentExceptions = new HashSet<>();
     this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
       HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);
     this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal,
@@ -74,7 +77,8 @@ public class Retryable {
           return callable.call();
         }
       } catch (Exception e) {
-        if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
+        if (this.failOnParentExceptions.stream().noneMatch(k -> k.isAssignableFrom(e.getClass()))
+          && this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
           && this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) {
           if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
             // case where waiting would go beyond max duration. So throw exception and return
@@ -149,8 +153,7 @@ public class Retryable {
 
     // making this thread safe as it appends to list
     public synchronized Builder withRetryOnException(final Class<? extends Exception> exceptionClass) {
-      if (exceptionClass != null &&
-        runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+      if (exceptionClass != null) {
         runnable.retryOn.add(exceptionClass);
       }
       return this;
@@ -158,17 +161,32 @@ public class Retryable {
 
     public synchronized Builder withRetryOnExceptionList(final List<Class<? extends Exception>> exceptionClassList) {
       for (final Class<? extends Exception> exceptionClass : exceptionClassList) {
-        if (exceptionClass != null &&
-          runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+        if (exceptionClass != null) {
           runnable.retryOn.add(exceptionClass);
         }
       }
       return this;
     }
 
+    public synchronized Builder withFailOnParentException(final Class<? extends Exception> exceptionClass) {
+      if (exceptionClass != null) {
+        runnable.failOnParentExceptions.add(exceptionClass);
+      }
+      return this;
+    }
+
+    public synchronized Builder withFailOnParentExceptionList(final List<Class<?
+            extends Exception>> exceptionClassList) {
+      for (final Class<? extends Exception> exceptionClass : exceptionClassList) {
+        if (exceptionClass != null) {
+          runnable.failOnParentExceptions.add(exceptionClass);
+        }
+      }
+      return this;
+    }
+
     public synchronized Builder withFailOnException(final Class<? extends Exception> exceptionClass) {
-      if (exceptionClass != null &&
-        runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+      if (exceptionClass != null) {
         runnable.failOn.add(exceptionClass);
       }
       return this;
@@ -177,8 +195,7 @@ public class Retryable {
     public synchronized Builder withFailOnExceptionList(final List<Class<?
       extends Exception>> exceptionClassList) {
       for (final Class<? extends Exception> exceptionClass : exceptionClassList) {
-        if (exceptionClass != null &&
-          runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+        if (exceptionClass != null) {
           runnable.failOn.add(exceptionClass);
         }
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index db68250..2f9c071 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -46,6 +46,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,6 +67,16 @@ public class CopyUtils {
   private FileSystem destinationFs;
   private final int maxParallelCopyTask;
 
+  private List<Class<? extends Exception>> failOnParentExceptionList = Arrays.asList(org.apache.hadoop.fs.PathIOException.class,
+          org.apache.hadoop.fs.UnsupportedFileSystemException.class,
+          org.apache.hadoop.fs.InvalidPathException.class,
+          org.apache.hadoop.fs.InvalidRequestException.class,
+          org.apache.hadoop.fs.FileAlreadyExistsException.class,
+          org.apache.hadoop.fs.ChecksumException.class,
+          org.apache.hadoop.fs.ParentNotDirectoryException.class,
+          org.apache.hadoop.hdfs.protocol.QuotaExceededException.class,
+          FileNotFoundException.class);
+
   public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) {
     this.hiveConf = hiveConf;
     maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES);
@@ -76,6 +87,57 @@ public class CopyUtils {
     this.destinationFs = destinationFs;
   }
 
+  private <T> T retryableFxn(Callable<T> callable) throws IOException {
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(hiveConf)
+            .withRetryOnException(IOException.class).withFailOnParentExceptionList(failOnParentExceptionList).build();
+    try {
+      return retryable.executeCallable(() -> callable.call());
+    } catch (Exception e) {
+      if (failOnParentExceptionList.stream().anyMatch(k -> k.isAssignableFrom(e.getClass()))) {
+        throw new IOException(e);
+      }
+      throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg(), e);
+    }
+  }
+
+  @VisibleForTesting
+  String checkSumFor(Path srcFile, FileSystem fs) throws IOException {
+    return retryableFxn(() ->  ReplChangeManager.checksumFor(srcFile, fs));
+  }
+
+  @VisibleForTesting
+  void copyFilesBetweenFS(FileSystem sourceFs, Path[] paths, FileSystem destinationFs,
+                                  Path finalDestination, boolean deleteSource, boolean overwrite) throws IOException {
+    retryableFxn(() -> FileUtil
+            .copy(sourceFs, paths, destinationFs, finalDestination, deleteSource, overwrite, hiveConf));
+  }
+
+  @VisibleForTesting
+  boolean exists(FileSystem fs, Path path) throws IOException {
+    return retryableFxn(() -> fs.exists(path));
+  }
+
+  @VisibleForTesting
+  boolean delete(FileSystem fs, Path path, boolean recursive) throws IOException {
+    return retryableFxn(() -> fs.delete(path, recursive));
+  }
+
+  @VisibleForTesting
+  boolean mkdirs(FileSystem fs, Path path) throws IOException {
+    return retryableFxn(() -> fs.mkdirs(path));
+  }
+
+  @VisibleForTesting
+  boolean rename(FileSystem fs, Path srcPath, Path dstPath) throws IOException {
+    return retryableFxn(() -> fs.rename(srcPath, dstPath));
+  }
+
+  @VisibleForTesting
+  ContentSummary getContentSummary(FileSystem fs, Path f) throws IOException {
+    return retryableFxn(() -> fs.getContentSummary(f));
+  }
+
   // Used by replication, copy files from source to destination. It is possible source file is
   // changed/removed during copy, so double check the checksum after copy,
   // if not match, copy again from cm
@@ -131,9 +193,6 @@ public class CopyUtils {
       if (executorService != null) {
         executorService.shutdown();
       }
-      if (proxyUser != null) {
-        FileSystem.closeAllForUGI(proxyUser);
-      }
     }
   }
 
@@ -144,15 +203,13 @@ public class CopyUtils {
 
   @VisibleForTesting
   void doCopy(Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry, UserGroupInformation proxyUser,
-                      boolean useRegularCopy, boolean overwrite) throws IOException, LoginException,
-    HiveFatalException {
+                      boolean useRegularCopy, boolean overwrite) throws IOException, LoginException, HiveFatalException {
     Path destination = destMapEntry.getKey();
     List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
     // Get the file system again from cache. There is a chance that the file system stored in the map is closed.
     // For instance, doCopyRetry closes the file system in case of i/o exceptions.
     FileSystem sourceFsOfFileInfo = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf);
-    if (!destinationFs.exists(destination)
-      && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
+    if (!exists(destinationFs, destination) && !mkdirs(destinationFs, destination)) {
       LOG.error("Failed to create destination directory: " + destination);
       throw new IOException("Destination directory creation failed");
     }
@@ -190,11 +247,14 @@ public class CopyUtils {
         // If copy fails, fall through the retry logic
         LOG.info("file operation failed", e);
 
-        if (repeat >= (MAX_IO_RETRY - 1)) {
-          //no need to wait in the last iteration
+        //Don't retry in the following cases:
+        //1. This is last attempt of retry.
+        //2. Execution already hit the exception which should not be retried.
+        //3. Retry is already exhausted by FS operations.
+        if (repeat >= (MAX_IO_RETRY - 1) || failOnParentExceptionList.stream().anyMatch(k -> k.isAssignableFrom(e.getClass()))
+                || ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg().equals(e.getMessage())) {
           break;
         }
-
         if (!(e instanceof FileNotFoundException)) {
           int sleepTime = FileUtils.getSleepTime(repeat);
           LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (repeat+1));
@@ -205,11 +265,6 @@ public class CopyUtils {
           }
 
           // looks like some network outrage, reset the file system object and retry.
-          if (proxyUser == null) {
-            FileSystem.closeAllForUGI(Utils.getUGI());
-          } else {
-            FileSystem.closeAllForUGI(proxyUser);
-          }
           sourceFs = pathList.get(0).getFileSystem(hiveConf);
           destinationFs = destination.getFileSystem(hiveConf);
         }
@@ -240,11 +295,11 @@ public class CopyUtils {
       }
       Path srcPath = srcFile.getEffectivePath();
       //Path destPath = new Path(destination, srcPath.getName());
-      if (destinationFs.exists(destination)) {
+      if (exists(destinationFs, destination)) {
         // If destination file is present and checksum of source mismatch, then retry copy.
         if (isSourceFileMismatch(sourceFs, srcFile)) {
           // Delete the incorrectly copied file and retry with CM path
-          destinationFs.delete(destination, true);
+          delete(destinationFs, destination, true);
           srcFile.setIsUseSourcePath(false);
         } else {
           // If the retry logic is reached after copy error, then include the copied file as well.
@@ -270,7 +325,7 @@ public class CopyUtils {
         throw new HiveFatalException(ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getMsg());
       }
 
-      if (!srcFile.isUseSourcePath() && !sourceFs.exists(srcFile.getCmPath())) {
+      if (!srcFile.isUseSourcePath() && !exists(sourceFs, srcFile.getCmPath())) {
         // CM path itself is missing, cannot recover from this error
         LOG.error("File Copy Failed. Both source and CM files are missing from source. "
                 + "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". "
@@ -297,7 +352,7 @@ public class CopyUtils {
       String destFileName = srcFile.getCmPath().getName();
       Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
       Path destFile = new Path(destRoot, destFileName);
-      if (dstFs.exists(destFile)) {
+      if (exists(dstFs, destFile)) {
         String destFileWithSourceName = srcFile.getSourcePath().getName();
         Path newDestFile = new Path(destRoot, destFileWithSourceName);
 
@@ -305,13 +360,13 @@ public class CopyUtils {
         // directly to table path (bypassing staging directory) then there might be some stale files from previous
         // incomplete/failed load. No need of recycle as this is a case of stale file.
         try {
-          dstFs.delete(newDestFile, true);
+          delete(dstFs, newDestFile, true);
           LOG.debug(" file " + newDestFile + " is deleted before renaming");
         } catch (FileNotFoundException e) {
           // no problem
         }
 
-        boolean result = dstFs.rename(destFile, newDestFile);
+        boolean result = rename(dstFs, destFile, newDestFile);
         if (!result) {
           throw new IllegalStateException(
                   "could not rename " + destFile.getName() + " to " + newDestFile.getName());
@@ -328,12 +383,11 @@ public class CopyUtils {
       if (sourceChecksumString != null) {
         String verifySourceChecksumString;
         try {
-          verifySourceChecksumString
-                  = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs);
+          verifySourceChecksumString = checkSumFor(srcFile.getSourcePath(), sourceFs);
         } catch (IOException e) {
           LOG.info("Unable to calculate checksum for source file: " + srcFile.getSourcePath(), e);
 
-          if (!sourceFs.exists(srcFile.getSourcePath())) {
+          if (!exists(sourceFs, srcFile.getSourcePath())) {
             // if source file is missing, then return true, so that cm path will be used for copy.
             return true;
           }
@@ -351,23 +405,13 @@ public class CopyUtils {
     if (copyAsUser == null) {
       return null;
     }
-    Retryable retryable = Retryable.builder()
-      .withHiveConf(hiveConf)
-      .withRetryOnException(IOException.class).build();
-    try {
-      return retryable.executeCallable(() -> {
-        UserGroupInformation proxyUser = null;
-        UserGroupInformation ugi = Utils.getUGI();
-        String currentUser = ugi.getShortUserName();
-        if (!currentUser.equals(copyAsUser)) {
-          proxyUser = UserGroupInformation.createProxyUser(
-            copyAsUser, UserGroupInformation.getLoginUser());
-        }
-        return proxyUser;
-      });
-    } catch (Exception e) {
-      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
-    }
+    return retryableFxn(() -> {
+      String currentUser = Utils.getUGI().getShortUserName();
+      if (!currentUser.equals(copyAsUser)) {
+        return UserGroupInformation.createProxyUser(copyAsUser, UserGroupInformation.getLoginUser());
+      }
+      return null;
+    });
   }
 
   // Copy without retry
@@ -423,8 +467,7 @@ public class CopyUtils {
           if (overWrite) {
             deleteSubDirs(destinationFs, destination);
           }
-          FileUtil
-              .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf);
+          copyFilesBetweenFS(sourceFs, paths, destinationFs, finalDestination, false, true);
           return true;
         });
       } catch (InterruptedException e) {
@@ -435,35 +478,29 @@ public class CopyUtils {
       if (overWrite) {
         deleteSubDirs(destinationFs, destination);
       }
-      FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf);
+      copyFilesBetweenFS(sourceFs, paths, destinationFs, destination, false, true);
     }
   }
 
   private void deleteSubDirs(FileSystem fs, Path path) throws IOException {
     //Delete the root path instead of doing a listing
     //This is more optimised
-    fs.delete(path, true);
+    delete(fs, path, true);
     //Recreate just the Root folder
-    fs.mkdirs(path);
+    mkdirs(fs, path);
   }
 
   public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException {
     Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths);
 
     UserGroupInformation proxyUser = getProxyUser();
-    try {
-      for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
-        final FileSystem sourceFs = entry.getKey();
-        List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
-           path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
-        doCopyOnce(sourceFs, entry.getValue(),
-                destination,
-                regularCopy(sourceFs, fileList), proxyUser, false);
-      }
-    } finally {
-      if (proxyUser != null) {
-        FileSystem.closeAllForUGI(proxyUser);
-      }
+    for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
+      final FileSystem sourceFs = entry.getKey();
+      List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
+              path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
+      doCopyOnce(sourceFs, entry.getValue(),
+              destination,
+              regularCopy(sourceFs, fileList), proxyUser, false);
     }
   }
 
@@ -492,11 +529,11 @@ public class CopyUtils {
     for (ReplChangeManager.FileInfo fileInfo : fileList) {
       ContentSummary contentSummary = null;
       try {
-        contentSummary = sourceFs.getContentSummary(fileInfo.getEffectivePath());
+        contentSummary = getContentSummary(sourceFs, fileInfo.getEffectivePath());
       } catch (IOException e) {
         // In replication, if source file does not exist, try cmroot
         if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) {
-          contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath());
+          contentSummary = getContentSummary(sourceFs, fileInfo.getCmPath());
           fileInfo.setIsUseSourcePath(false);
         }
       }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
index 94993bb..4740802 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
@@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyListOf;
 import static org.mockito.ArgumentMatchers.eq;
@@ -57,7 +59,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
  * Unit Test class for CopyUtils class.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ CopyUtils.class, FileUtils.class, Utils.class, UserGroupInformation.class})
+@PrepareForTest({ CopyUtils.class, FileUtils.class, Utils.class, UserGroupInformation.class, ReplChangeManager.class})
 @PowerMockIgnore({ "javax.management.*" })
 public class TestCopyUtils {
   /*
@@ -111,6 +113,100 @@ public class TestCopyUtils {
   }
 
   @Test
+  public void testFSCallsFailOnParentExceptions() throws Exception {
+    mockStatic(UserGroupInformation.class);
+    mockStatic(ReplChangeManager.class);
+    when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
+    HiveConf conf = mock(HiveConf.class);
+    conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s");
+    FileSystem fs = mock(FileSystem.class);
+    Path source = mock(Path.class);
+    Path destination = mock(Path.class);
+    ContentSummary cs = mock(ContentSummary.class);
+
+    Exception exception = new org.apache.hadoop.fs.PathPermissionException("Failed");
+    when(ReplChangeManager.checksumFor(source, fs)).thenThrow(exception).thenReturn("dummy");
+    when(fs.exists(same(source))).thenThrow(exception).thenReturn(true);
+    when(fs.delete(same(source), anyBoolean())).thenThrow(exception).thenReturn(true);
+    when(fs.mkdirs(same(source))).thenThrow(exception).thenReturn(true);
+    when(fs.rename(same(source), same(destination))).thenThrow(exception).thenReturn(true);
+    when(fs.getContentSummary(same(source))).thenThrow(exception).thenReturn(cs);
+
+    CopyUtils copyUtils = new CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, fs);
+    CopyUtils copyUtilsSpy = Mockito.spy(copyUtils);
+    try {
+      copyUtilsSpy.exists(fs, source);
+    } catch (Exception e) {
+      assertEquals(exception.getClass(), e.getCause().getClass());
+    }
+    Mockito.verify(fs, Mockito.times(1)).exists(source);
+    try {
+      copyUtils.delete(fs, source, true);
+    } catch (Exception e) {
+      assertEquals(exception.getClass(), e.getCause().getClass());
+    }
+    Mockito.verify(fs, Mockito.times(1)).delete(source, true);
+    try {
+      copyUtils.mkdirs(fs, source);
+    } catch (Exception e) {
+      assertEquals(exception.getClass(), e.getCause().getClass());
+    }
+    Mockito.verify(fs, Mockito.times(1)).mkdirs(source);
+    try {
+      copyUtils.rename(fs, source, destination);
+    } catch (Exception e) {
+      assertEquals(exception.getClass(), e.getCause().getClass());
+    }
+    Mockito.verify(fs, Mockito.times(1)).rename(source, destination);
+    try {
+      copyUtilsSpy.getContentSummary(fs, source);
+    } catch (Exception e) {
+      assertEquals(exception.getClass(), e.getCause().getClass());;
+    }
+    Mockito.verify(fs, Mockito.times(1)).getContentSummary(source);
+    try {
+      copyUtilsSpy.checkSumFor(source, fs);
+    } catch (Exception e) {
+      assertEquals(exception.getClass(), e.getCause().getClass());
+    }
+    Mockito.verify(copyUtilsSpy, Mockito.times(1)).checkSumFor(source, fs);
+  }
+
+  @Test
+  public void testRetryableFSCalls() throws Exception {
+    mockStatic(UserGroupInformation.class);
+    mockStatic(ReplChangeManager.class);
+    when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
+    HiveConf conf = mock(HiveConf.class);
+    conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s");
+    FileSystem fs = mock(FileSystem.class);
+    Path source = mock(Path.class);
+    Path destination = mock(Path.class);
+    ContentSummary cs = mock(ContentSummary.class);
+
+    when(ReplChangeManager.checksumFor(source, fs)).thenThrow(new IOException("Failed")).thenReturn("dummy");
+    when(fs.exists(same(source))).thenThrow(new IOException("Failed")).thenReturn(true);
+    when(fs.delete(same(source), anyBoolean())).thenThrow(new IOException("Failed")).thenReturn(true);
+    when(fs.mkdirs(same(source))).thenThrow(new IOException("Failed")).thenReturn(true);
+    when(fs.rename(same(source), same(destination))).thenThrow(new IOException("Failed")).thenReturn(true);
+    when(fs.getContentSummary(same(source))).thenThrow(new IOException("Failed")).thenReturn(cs);
+
+    CopyUtils copyUtils = new CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, fs);
+    CopyUtils copyUtilsSpy = Mockito.spy(copyUtils);
+    assertEquals (true, copyUtilsSpy.exists(fs, source));
+    Mockito.verify(fs, Mockito.times(2)).exists(source);
+    assertEquals (true, copyUtils.delete(fs, source, true));
+    Mockito.verify(fs, Mockito.times(2)).delete(source, true);
+    assertEquals (true, copyUtils.mkdirs(fs, source));
+    Mockito.verify(fs, Mockito.times(2)).mkdirs(source);
+    assertEquals (true, copyUtils.rename(fs, source, destination));
+    Mockito.verify(fs, Mockito.times(2)).rename(source, destination);
+    assertEquals (cs, copyUtilsSpy.getContentSummary(fs, source));
+    Mockito.verify(fs, Mockito.times(2)).getContentSummary(source);
+    assertEquals ("dummy", copyUtilsSpy.checkSumFor(source, fs));
+  }
+
+  @Test
   public void testParallelCopySuccess() throws Exception {
     mockStatic(UserGroupInformation.class);
     when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
@@ -148,4 +244,4 @@ public class TestCopyUtils {
     Mockito.verify(mockExecutorService,
       Mockito.times(1)).invokeAll(callableCapture.capture());
   }
-}
\ No newline at end of file
+}