You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/08/10 05:41:08 UTC

[hive] branch master updated: HIVE-23955:Classification of Error Codes in Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new aebb74e  HIVE-23955:Classification of Error Codes in Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
aebb74e is described below

commit aebb74ea361b5018ffde96ae7ddfb4ee899d5329
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Aug 10 11:10:56 2020 +0530

    HIVE-23955:Classification of Error Codes in Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
 .../org/apache/hadoop/hive/common/FileUtils.java   |   1 -
 .../java/org/apache/hadoop/hive/ql/ErrorMsg.java   |  38 +++---
 .../parse/TestReplicationScenariosAcidTables.java  |   2 +
 .../TestReplicationScenariosAcrossInstances.java   |  31 ++++-
 .../parse/TestTableLevelReplicationScenarios.java  |   2 +
 .../java/org/apache/hive/jdbc/TestJdbcDriver2.java |   7 -
 .../hadoop/hive/ql/exec/repl/AtlasDumpTask.java    |  60 +++++----
 .../hadoop/hive/ql/exec/repl/AtlasLoadTask.java    |  40 +++---
 .../hadoop/hive/ql/exec/repl/DirCopyTask.java      | 143 +++++++--------------
 .../hadoop/hive/ql/exec/repl/RangerDumpTask.java   |  16 ++-
 .../hadoop/hive/ql/exec/repl/RangerLoadTask.java   |  16 ++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  85 ++++++------
 .../hive/ql/exec/repl/ReplExternalTables.java      |  82 ++++++------
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |   5 +-
 .../hive/ql/exec/repl/atlas/AtlasRestClient.java   |   7 +-
 .../ql/exec/repl/atlas/AtlasRestClientBuilder.java |   8 +-
 .../ql/exec/repl/atlas/AtlasRestClientImpl.java    |  41 +++---
 .../ql/exec/repl/atlas/NoOpAtlasRestClient.java    |   5 +-
 .../exec/repl/atlas/RetryingClientTimeBased.java   |  10 +-
 .../ql/exec/repl/ranger/RangerRestClientImpl.java  |  14 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  12 +-
 .../hadoop/hive/ql/parse/repl/CopyUtils.java       |  37 ++----
 .../hive/ql/exec/repl/TestRangerLoadTask.java      |   3 +-
 .../alter_table_wrong_location2.q.out              |   2 +-
 24 files changed, 345 insertions(+), 322 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 142d779..24657b1 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory;
 public final class FileUtils {
   private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class.getName());
   private static final Random random = new Random();
-  public static final int MAX_IO_ERROR_RETRY = 5;
   public static final int IO_ERROR_SLEEP_TIME = 100;
 
   public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index d943412..8c78da9 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -331,8 +331,8 @@ public enum ErrorMsg {
   TABLE_NOT_PARTITIONED(10241, "Table {0} is not a partitioned table", true),
   DATABASE_ALREADY_EXISTS(10242, "Database {0} already exists", true),
   CANNOT_REPLACE_COLUMNS(10243, "Replace columns is not supported for table {0}. SerDe may be incompatible.", true),
-  BAD_LOCATION_VALUE(10244, "{0}  is not absolute.  Please specify a complete absolute uri."),
-  UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"),
+  BAD_LOCATION_VALUE(10244, "{0}  is not absolute.  Please specify a complete absolute uri.", true),
+  UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported", true),
   INVALID_BIGTABLE_MAPJOIN(10246, "{0} table chosen for streaming is not valid", true),
   MISSING_OVER_CLAUSE(10247, "Missing over clause for function : "),
   PARTITION_SPEC_TYPE_MISMATCH(10248, "Cannot add partition column {0} of type {1} as it cannot be converted to type {2}", true),
@@ -505,18 +505,8 @@ public enum ErrorMsg {
           " queue: {1}. Please fix and try again.", true),
   SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."),
 
-  //if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification
-  //method in HiveMetaStoreClient
-  REPL_EVENTS_MISSING_IN_METASTORE(20016, "Notification events are missing in the meta store."),
-  REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(20017, "Load path {0} not valid as target database is bootstrapped " +
-          "from some other path : {1}."),
-  REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20018, "File is missing from both source and cm path."),
-  REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."),
-  REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020,
-          "Source of replication (repl.source.for) is not set in the database properties."),
-  REPL_INVALID_DB_OR_TABLE_PATTERN(20021,
-          "Invalid pattern for the DB or table name in the replication policy. "
-                  + "It should be a valid regex enclosed within single or double quotes."),
+  REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20016, "File is missing from both source and cm path."),
+  REPL_EXTERNAL_SERVICE_CONNECTION_ERROR(20017, "Failed to connect to {0} service. Error code {1}.",true),
 
   // An exception from runtime that will show the full stack to client
   UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true),
@@ -605,9 +595,10 @@ public enum ErrorMsg {
 
   SPARK_JOB_INTERRUPTED(30044, "Spark job was interrupted while executing"),
   SPARK_GET_JOB_INFO_INTERRUPTED(30045, "Spark job was interrupted while getting job info"),
-  SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"),
+  SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}", true),
 
-  REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."),
+  REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired. Error {0}",
+    true),
   SPARK_GET_STAGES_INFO_TIMEOUT(30048, "Spark job GetSparkStagesInfoJob timed out after {0} seconds.", true),
   SPARK_GET_STAGES_INFO_INTERRUPTED(30049, "Spark job GetSparkStagesInfoJob was interrupted."),
   SPARK_GET_STAGES_INFO_EXECUTIONERROR(30050, "Spark job GetSparkStagesInfoJob failed in execution while getting job info due to exception {0}", true),
@@ -616,7 +607,20 @@ public enum ErrorMsg {
 
   SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true),
   SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true),
-  REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.")
+  REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication."),
+  REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(40004,
+                                               "Source of replication (repl.source.for) is not set in the database properties."),
+  REPL_INVALID_DB_OR_TABLE_PATTERN(40005,
+                                     "Invalid pattern for the DB or table name in the replication policy. "
+                                     + "It should be a valid regex enclosed within single or double quotes."),
+  //if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification
+  //method in HiveMetaStoreClient
+  REPL_EVENTS_MISSING_IN_METASTORE(40006, "Notification events are missing in the meta store."),
+  REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(40007, "Load path {0} not valid as target database is bootstrapped " +
+    "from some other path : {1}.", true),
+  REPL_INVALID_CONFIG_FOR_SERVICE(40008, "Invalid config error : {0} for {1} service.", true),
+  REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE(40009, "Invalid internal config error : {0} for {1} service.", true),
+  REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", true)
   ;
 
   private int errorCode;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index cd48d4b..b1c02e5 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -1957,6 +1957,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
       replica.dump(replicatedDbName);
     } catch (Exception e) {
       Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage());
+      Assert.assertEquals(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getErrorCode(),
+        ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
     }
     replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')");
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index b4666ba..d735c9b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
@@ -1600,6 +1602,27 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .verifyResults(new String[] {"1", "2"});
   }
 
+  @Test
+  public void testRangerReplicationRetryExhausted() throws Throwable {
+    List<String> clause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "'='true'",
+      "'" + HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY + "'='1s'", "'" + HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION
+        + "'='30s'", "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL + "'='false'", "'" + HiveConf.ConfVars.HIVE_IN_TEST
+        + "'='false'");
+    try {
+      primary.run("use " + primaryDbName)
+        .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+          "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+        .run("create table table1 (i String)")
+        .run("insert into table1 values (1)")
+        .run("insert into table1 values (2)")
+        .dump(primaryDbName, clause);
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(ErrorMsg.REPL_RETRY_EXHAUSTED.getErrorCode(),
+        ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
+    }
+  }
+
   /*
   Can't test complete replication as mini ranger is not supported
   Testing just the configs and no impact on existing replication
@@ -1617,7 +1640,10 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     try {
       primary.dump(primaryDbName, clause);
     } catch (SemanticException e) {
-      assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
+      assertEquals("Invalid config error : Authorizer sentry not supported for replication  " +
+        "for ranger service.", e.getMessage());
+      assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(),
+        ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
     }
   }
 
@@ -1732,7 +1758,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
       }
       Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail.");
     } catch (SemanticException e) {
-      assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas metadata replication"));
+      assertEquals(e.getMessage(), ("Invalid config error : " + conf
+        + " is mandatory config for Atlas metadata replication for atlas service."));
     }
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 3bc63a3..bd2a439 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -415,6 +415,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
         LOG.info("Got exception: {}", ex.getMessage());
         Assert.assertTrue(ex instanceof SemanticException);
         Assert.assertTrue(ex.getMessage().equals(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg()));
+        Assert.assertEquals(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getErrorCode(),
+          ErrorMsg.getErrorMsg(ex.getMessage()).getErrorCode());
         failed = true;
       }
       Assert.assertTrue(failed);
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index fb38b8c..f4e7c4f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -3120,13 +3120,6 @@ public class TestJdbcDriver2 {
       assertTrue(e.getErrorCode() == ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getErrorCode());
     }
 
-    try {
-      // invalid load path
-      stmt.execute("repl load default into default1");
-    } catch(SQLException e){
-      assertTrue(e.getErrorCode() == ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getErrorCode());
-    }
-
     stmt.close();
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index e6384af..8020fa4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -46,6 +47,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.FileNotFoundException;
 import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -132,31 +134,43 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
 
   private long lastStoredTimeStamp() throws SemanticException {
     Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
-    BufferedReader br = null;
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(conf)
+      .withRetryOnException(IOException.class)
+      .withFailOnException(FileNotFoundException.class).build();
     try {
-      FileSystem fs = prevMetadataPath.getFileSystem(conf);
-      br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
-      String line = br.readLine();
-      if (line == null) {
-        throw new SemanticException("Could not read lastStoredTimeStamp from atlas metadata file");
-      }
-      String[] lineContents = line.split("\t", 5);
-      return Long.parseLong(lineContents[1]);
-    } catch (Exception ex) {
-      throw new SemanticException(ex);
-    } finally {
-      if (br != null) {
+      return retryable.executeCallable(() -> {
+        BufferedReader br = null;
         try {
-          br.close();
-        } catch (IOException e) {
-          throw new SemanticException(e);
+          FileSystem fs = prevMetadataPath.getFileSystem(conf);
+          br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+          String line = br.readLine();
+          if (line == null) {
+            throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE
+              .format("Could not read lastStoredTimeStamp from atlas metadata file",
+                ReplUtils.REPL_ATLAS_SERVICE));
+          }
+          String[] lineContents = line.split("\t", 5);
+          return Long.parseLong(lineContents[1]);
+        } finally {
+          if (br != null) {
+            try {
+              br.close();
+            } catch (IOException e) {
+              //Do nothing
+            }
+          }
         }
-      }
+      });
+    } catch (SemanticException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 
   private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
-    AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster(), conf);
     long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
             ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
     LOG.debug("Current timestamp is: {}", ret);
@@ -177,13 +191,13 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
     } catch (SemanticException ex) {
       throw ex;
     } catch (Exception ex) {
-      throw new SemanticException(ex);
+      throw new SemanticException(ex.getMessage(), ex);
     } finally {
       if (inputStream != null) {
         try {
           inputStream.close();
         } catch (IOException e) {
-          throw new SemanticException(e);
+          //Do nothing
         }
       }
     }
@@ -196,12 +210,14 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
     AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb);
     Set<Map.Entry<String, Object>> entries = objectId.getUniqueAttributes().entrySet();
     if (entries == null || entries.isEmpty()) {
-      throw new SemanticException("Could find entries in objectId for:" + clusterName);
+      throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format("Could find " +
+        "entries in objectId for:" + clusterName, ReplUtils.REPL_ATLAS_SERVICE));
     }
     Map.Entry<String, Object> item = entries.iterator().next();
     String guid = atlasRestClient.getEntityGuid(objectId.getTypeName(), item.getKey(), (String) item.getValue());
     if (guid == null || guid.isEmpty()) {
-      throw new SemanticException("Entity not found:" + objectId);
+      throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE
+        .format("Entity not found:" + objectId, ReplUtils.REPL_ATLAS_SERVICE));
     }
     return guid;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index b24b3d6..1506db8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger;
@@ -48,6 +49,7 @@ import java.net.URL;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /**
  * Atlas Metadata Replication Load Task.
@@ -113,26 +115,30 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
 
   private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
     Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
-    BufferedReader br = null;
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(conf)
+      .withRetryOnException(IOException.class).build();
     try {
-      FileSystem fs = metadataPath.getFileSystem(conf);
-      br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
-      String line = br.readLine();
-      if (line == null) {
-        throw new SemanticException("Could not read stored src FS Uri from atlas metadata file");
-      }
-      String[] lineContents = line.split("\t", 5);
-      return lineContents[0];
-    } catch (Exception ex) {
-      throw new SemanticException(ex);
-    } finally {
-      if (br != null) {
+      return retryable.executeCallable(() -> {
+        BufferedReader br = null;
         try {
-          br.close();
-        } catch (IOException e) {
-          throw new SemanticException(e);
+          FileSystem fs = metadataPath.getFileSystem(conf);
+          br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
+          String line = br.readLine();
+          if (line == null) {
+            throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format("Could not read stored " +
+              "src FS Uri from atlas metadata file", ReplUtils.REPL_ATLAS_SERVICE));
+          }
+          String[] lineContents = line.split("\t", 5);
+          return lineContents[0];
+        } finally {
+          if (br != null) {
+            br.close();
+          }
         }
-      }
+      });
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
index e8a8df1..5ed09f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -43,7 +44,6 @@ import java.util.Collections;
  */
 public class DirCopyTask extends Task<DirCopyWork> implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
-  private static final int MAX_COPY_RETRY = 5;
 
   private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException {
     FileSystem targetFs = destPath.getFileSystem(conf);
@@ -51,7 +51,8 @@ public class DirCopyTask extends Task<DirCopyWork> implements Serializable {
     if (!targetFs.exists(destPath)) {
       // target path is created even if the source path is missing, so that ddl task does not try to create it.
       if (!targetFs.mkdirs(destPath)) {
-        throw new IOException(destPath + " is not a directory or unable to create one");
+        throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.format(
+          destPath + " is not a directory or unable to create one"));
       }
       createdDir = true;
     }
@@ -86,107 +87,61 @@ public class DirCopyTask extends Task<DirCopyWork> implements Serializable {
     return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> sourcePath.getFileSystem(conf).exists(sourcePath));
   }
 
-  private int handleException(Exception e, Path sourcePath, Path targetPath,
-                              int currentRetry, UserGroupInformation proxyUser) {
-    try {
-      LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e);
-      if (!checkIfPathExist(sourcePath, proxyUser)) {
-        LOG.info("Source path is missing. Ignoring exception.");
-        return 0;
-      }
-    } catch (Exception ex) {
-      LOG.warn("Source path missing check failed. ", ex);
-    }
-    // retry logic only for i/o exception
-    if (!(e instanceof IOException)) {
-      LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e);
-      setException(e);
-      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-    }
-
-    if (currentRetry <= MAX_COPY_RETRY) {
-      LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e);
-    } else {
-      LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e);
-      setException(e);
-      return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode();
-    }
-    int sleepTime = FileUtils.getSleepTime(currentRetry);
-    LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry));
-    try {
-      Thread.sleep(sleepTime);
-    } catch (InterruptedException timerEx) {
-      LOG.info("Sleep interrupted", timerEx.getMessage());
-    }
-    try {
-      if (proxyUser == null) {
-        proxyUser = Utils.getUGI();
-      }
-      FileSystem.closeAllForUGI(proxyUser);
-    } catch (Exception ex) {
-      LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex);
-    }
-    return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-  }
-
   @Override
   public int execute() {
     String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(conf)
+      .withRetryOnException(IOException.class).build();
+    try {
+      return retryable.executeCallable(() -> {
+        UserGroupInformation proxyUser = null;
+        Path sourcePath = work.getFullyQualifiedSourcePath();
+        Path targetPath = work.getFullyQualifiedTargetPath();
+        try {
+          if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+            sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri());
+            targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri());
+          }
+          UserGroupInformation ugi = Utils.getUGI();
+          String currentUser = ugi.getShortUserName();
+          if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
+            proxyUser = UserGroupInformation.createProxyUser(
+              distCpDoAsUser, UserGroupInformation.getLoginUser());
+          }
 
-    Path sourcePath = work.getFullyQualifiedSourcePath();
-    Path targetPath = work.getFullyQualifiedTargetPath();
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
-      sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri());
-      targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri());
-    }
-    int currentRetry = 0;
-    int error = 0;
-    UserGroupInformation proxyUser = null;
-    while (currentRetry <= MAX_COPY_RETRY) {
-      try {
-        UserGroupInformation ugi = Utils.getUGI();
-        String currentUser = ugi.getShortUserName();
-        if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
-          proxyUser = UserGroupInformation.createProxyUser(
-                  distCpDoAsUser, UserGroupInformation.getLoginUser());
-        }
-
-        setTargetPathOwner(targetPath, sourcePath, proxyUser);
-
-        // do we create a new conf and only here provide this additional option so that we get away from
-        // differences of data in two location for the same directories ?
-        // basically add distcp.options.delete to hiveconf new object ?
-        FileUtils.distCp(
-                sourcePath.getFileSystem(conf), // source file system
-                Collections.singletonList(sourcePath),  // list of source paths
-                targetPath,
-                false,
-                proxyUser,
-                conf,
-                ShimLoader.getHadoopShims());
-        return 0;
-      } catch (Exception e) {
-        currentRetry++;
-        error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser);
-        if (error == 0) {
-          return 0;
-        }
-      } finally {
-        if (proxyUser != null) {
+          setTargetPathOwner(targetPath, sourcePath, proxyUser);
           try {
-            FileSystem.closeAllForUGI(proxyUser);
-          } catch (IOException e) {
-            LOG.error("Unable to closeAllForUGI for user " + proxyUser, e);
-            if (error == 0) {
-              setException(e);
-              error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+            if (!checkIfPathExist(sourcePath, proxyUser)) {
+              LOG.info("Source path is missing. Ignoring exception.");
+              return 0;
             }
-            break;
+          } catch (Exception ex) {
+            LOG.warn("Source path missing check failed. ", ex);
+            //Should be retried
+            throw new IOException(ex);
+          }
+          // do we create a new conf and only here provide this additional option so that we get away from
+          // differences of data in two location for the same directories ?
+          // basically add distcp.options.delete to hiveconf new object ?
+          FileUtils.distCp(
+            sourcePath.getFileSystem(conf), // source file system
+            Collections.singletonList(sourcePath),  // list of source paths
+            targetPath,
+            false,
+            proxyUser,
+            conf,
+            ShimLoader.getHadoopShims());
+          return 0;
+        } finally {
+          if (proxyUser != null) {
+            FileSystem.closeAllForUGI(proxyUser);
           }
         }
-      }
+      });
+    } catch (Exception e) {
+      throw new SecurityException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
-    return error;
   }
 
   private static Path reservedRawPath(URI uri) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
index 4c1a170..0576ecc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
@@ -92,14 +92,22 @@ public class RangerDumpTask extends Task<RangerDumpWork> implements Serializable
       }
       URL url = work.getRangerConfigResource();
       if (url == null) {
-        throw new SemanticException("Ranger configuration is not valid "
-          + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME);
+        throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+          .format("Ranger configuration is not valid "
+          + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME, ReplUtils.REPL_RANGER_SERVICE));
       }
       conf.addResource(url);
       String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
       String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
-      if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
-        throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
+      if (StringUtils.isEmpty(rangerEndpoint)) {
+        throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+          .format("Ranger endpoint is not valid "
+            + rangerEndpoint, ReplUtils.REPL_RANGER_SERVICE));
+      }
+      if (!rangerRestClient.checkConnection(rangerEndpoint, conf)) {
+        throw new SemanticException(ErrorMsg.REPL_EXTERNAL_SERVICE_CONNECTION_ERROR.format(ReplUtils
+            .REPL_RANGER_SERVICE,
+          "Ranger endpoint is not reachable " + rangerEndpoint));
       }
       RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint,
               work.getDbName(), rangerHiveServiceName, conf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
index 3c7e9e24..240caf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
@@ -89,14 +89,22 @@ public class RangerLoadTask extends Task<RangerLoadWork> implements Serializable
       }
       URL url = work.getRangerConfigResource();
       if (url == null) {
-        throw new SemanticException("Ranger configuration is not valid "
-          + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME);
+        throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+          .format("Ranger configuration is not valid "
+            + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME, ReplUtils.REPL_RANGER_SERVICE));
       }
       conf.addResource(url);
       String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
       String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
-      if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
-        throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
+      if (StringUtils.isEmpty(rangerEndpoint)) {
+        throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+          .format("Ranger endpoint is not valid "
+            + rangerEndpoint, ReplUtils.REPL_RANGER_SERVICE));
+      }
+      if (!rangerRestClient.checkConnection(rangerEndpoint, conf)) {
+        throw new SemanticException(ErrorMsg.REPL_EXTERNAL_SERVICE_CONNECTION_ERROR.format(ReplUtils
+            .REPL_RANGER_SERVICE,
+          "Ranger endpoint is not valid " + rangerEndpoint));
       }
       if (work.getCurrentDumpPath() != null) {
         LOG.info("Importing Ranger Metadata from {} ", work.getCurrentDumpPath());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index dc8d790..214c12d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.Constants;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
-import org.apache.hadoop.hive.metastore.utils.Retry;
 import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -87,12 +85,10 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.login.LoginException;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -218,8 +214,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       }
       childTasks.add(rangerDumpTask);
     } else {
-      throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
-              + " not supported for replication ");
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer "
+        + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
+              + " not supported for replication ", ReplUtils.REPL_RANGER_SERVICE));
     }
   }
 
@@ -647,6 +644,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
     return currentEventMaxLimit;
   }
+
   private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws SemanticException {
     Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1));
     Retryable retryable = Retryable.builder()
@@ -668,22 +666,29 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   }
 
   private long getResumeFrom(Path ackFile) throws SemanticException {
-    BufferedReader br = null;
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(conf)
+      .withRetryOnException(Exception.class).build();
     try {
-      FileSystem fs = ackFile.getFileSystem(conf);
-      br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset()));
-      long lastEventID = Long.parseLong(br.readLine());
-      return lastEventID;
-    } catch (Exception ex) {
-      throw new SemanticException(ex);
-    } finally {
-      if (br != null) {
+      return retryable.executeCallable(() -> {
+        BufferedReader br = null;
         try {
-          br.close();
-        } catch (IOException e) {
-          throw new SemanticException(e);
+          FileSystem fs = ackFile.getFileSystem(conf);
+          br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset()));
+          long lastEventID = Long.parseLong(br.readLine());
+          return lastEventID;
+        } finally {
+          if (br != null) {
+            try {
+              br.close();
+            } catch (Exception e) {
+              //Do nothing
+            }
+          }
         }
-      }
+      });
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 
@@ -729,7 +734,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   }
 
   private void dumpTableListToDumpLocation(List<String> tableList, Path dbRoot, String dbName,
-                                           HiveConf hiveConf) throws IOException, LoginException {
+                                           HiveConf hiveConf) throws Exception {
     // Empty list will create an empty file to distinguish it from db level replication. If no file is there, that means
     // db level replication. If empty file is there, means no table satisfies the policy.
     if (tableList == null) {
@@ -738,12 +743,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
 
     // The table list is dumped in _tables/dbname file
-    Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME);
-    tableListFile = new Path(tableListFile, dbName.toLowerCase());
-
-    int count = 0;
-    while (count < FileUtils.MAX_IO_ERROR_RETRY) {
-      try (FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile)) {
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(conf)
+      .withRetryOnException(IOException.class).build();
+    try {
+      retryable.executeCallable((Callable<Void>) () -> {
+        Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME);
+        tableListFile = new Path(tableListFile, dbName.toLowerCase());
+        FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile);
         for (String tableName : tableList) {
           String line = tableName.toLowerCase().concat("\n");
           writer.write(line.getBytes(StandardCharsets.UTF_8));
@@ -751,27 +758,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         // Close is called explicitly as close also calls the actual file system write,
         // so there is chance of i/o exception thrown by close.
         writer.close();
-        break;
-      } catch (IOException e) {
-        LOG.info("File operation failed", e);
-        if (count >= (FileUtils.MAX_IO_ERROR_RETRY - 1)) {
-          //no need to wait in the last iteration
-          LOG.error("File " + tableListFile.toUri() + " creation failed even after " +
-                  FileUtils.MAX_IO_ERROR_RETRY + " attempts.");
-          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
-        }
-        int sleepTime = FileUtils.getSleepTime(count);
-        LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (count+1));
-        try {
-          Thread.sleep(sleepTime);
-        } catch (InterruptedException timerEx) {
-          LOG.info("Sleep interrupted", timerEx.getMessage());
-        }
-        FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI());
-      }
-      count++;
+        LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList);
+        return null;
+      });
+    } catch (Exception e) {
+      FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI());
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
-    LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList);
   }
 
   Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
index b6e0858..796cfae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -48,6 +50,7 @@ import java.util.Base64;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 /**
  * Format of the file used to dump information about external tables:
@@ -62,7 +65,6 @@ public final class ReplExternalTables {
   private static final Logger LOG = LoggerFactory.getLogger(ReplExternalTables.class);
   private static final String FIELD_SEPARATOR = ",";
   public static final String FILE_NAME = "_external_tables_info";
-  private static final int MAX_RETRIES = 5;
 
   private ReplExternalTables(){}
 
@@ -79,9 +81,9 @@ public final class ReplExternalTables {
     String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
     URI baseDirUri  = StringUtils.isEmpty(baseDir) ? null : new Path(baseDir).toUri();
     if (baseDirUri == null || baseDirUri.getScheme() == null || baseDirUri.getAuthority() == null) {
-      throw new SemanticException(
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format(
               String.format("Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required %s",
-                      baseDir == null ? "" : "- ('" + baseDir + "')"));
+                      baseDir == null ? "" : "- ('" + baseDir + "')"), ReplUtils.REPL_HIVE_SERVICE));
     }
     return new Path(baseDirUri);
   }
@@ -102,7 +104,8 @@ public final class ReplExternalTables {
               basePath.getFileSystem(hiveConf)
       );
     } catch (IOException e) {
-      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format(
+        ErrorMsg.INVALID_PATH.getMsg(), ReplUtils.REPL_HIVE_SERVICE), e);
     }
     return dataPath;
   }
@@ -196,23 +199,22 @@ public final class ReplExternalTables {
       return lineToWrite.toString();
     }
 
-    private void write(String line) throws InterruptedException {
-      int currentRetry = 0;
-      while (currentRetry < MAX_RETRIES) {
-        try {
-          writer.write(line.getBytes(StandardCharsets.UTF_8));
-          break;
-        } catch (IOException e) {
-          currentRetry++;
-          if (currentRetry < MAX_RETRIES) {
-            LOG.warn("failed to write data with maxRetries {} due to", currentRetry, e);
-          } else {
-            LOG.error("failed to write data with maxRetries {} due to", currentRetry, e);
-            throw new RuntimeException("failed to write data", e);
+    private void write(String line) throws SemanticException {
+      Retryable retryable = Retryable.builder()
+        .withHiveConf(hiveConf)
+        .withRetryOnException(IOException.class).build();
+      try {
+        retryable.executeCallable((Callable<Void>) () -> {
+          try {
+            writer.write(line.getBytes(StandardCharsets.UTF_8));
+          } catch (IOException e) {
+            writer = openWriterAppendMode();
+            throw e;
           }
-          Thread.sleep(100 * currentRetry * currentRetry);
-          writer = openWriterAppendMode();
-        }
+          return null;
+        });
+      } catch (Exception e) {
+        throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
       }
     }
 
@@ -288,33 +290,27 @@ public final class ReplExternalTables {
       if (!fileSystem.exists(externalTableInfo)) {
         return locationsToCopy;
       }
-
-      int currentRetry = 0;
-      BufferedReader reader = null;
-      while (currentRetry < MAX_RETRIES) {
-        try {
-          reader = reader(fileSystem, externalTableInfo);
-          for (String line = reader.readLine(); line != null; line = reader.readLine()) {
-            String[] splits = line.split(FIELD_SEPARATOR);
-            locationsToCopy
+      Retryable retryable = Retryable.builder()
+        .withHiveConf(hiveConf)
+        .withRetryOnException(IOException.class).build();
+      try {
+        return retryable.executeCallable(() -> {
+          BufferedReader reader = null;
+          try {
+            reader = reader(fileSystem, externalTableInfo);
+            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+              String[] splits = line.split(FIELD_SEPARATOR);
+              locationsToCopy
                 .add(new String(Base64.getDecoder().decode(splits[1]), StandardCharsets.UTF_8));
-          }
-          return locationsToCopy;
-        } catch (IOException e) {
-          currentRetry++;
-          if (currentRetry < MAX_RETRIES) {
+            }
+            return locationsToCopy;
+          } finally {
             closeQuietly(reader);
-            LOG.warn("failed to read {}", externalTableInfo.toString(), e);
-          } else {
-            LOG.error("failed to read {}", externalTableInfo.toString(), e);
-            throw e;
           }
-        } finally {
-          closeQuietly(reader);
-        }
+        });
+      } catch (Exception e) {
+        throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
       }
-      // we should never reach here
-      throw new IllegalStateException("we should never reach this condition");
     }
 
     private static void closeQuietly(BufferedReader reader) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 3c3dc44..8029b72 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -161,8 +161,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       }
       childTasks.add(rangerLoadTask);
     } else {
-      throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
-              + " not supported for replication ");
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer " +
+        conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
+              + " not supported for replication ", ReplUtils.REPL_RANGER_SERVICE));
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
index dd72f83..e49664d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import java.io.InputStream;
 
@@ -30,14 +31,14 @@ import java.io.InputStream;
  */
 public interface AtlasRestClient {
 
-  InputStream exportData(AtlasExportRequest request) throws Exception;
+  InputStream exportData(AtlasExportRequest request) throws SemanticException;
 
   AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception;
 
-  AtlasServer getServer(String endpoint) throws SemanticException;
+  AtlasServer getServer(String endpoint, HiveConf conf) throws SemanticException;
 
   String getEntityGuid(final String entityType, final String attributeName, final String qualifiedName)
           throws SemanticException;
 
-  boolean getStatus() throws SemanticException;
+  boolean getStatus(HiveConf conf) throws SemanticException;
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
index 6f24f57..6c609aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
@@ -23,6 +23,8 @@ import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasException;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -64,7 +66,8 @@ public class AtlasRestClientBuilder {
 
   private AtlasRestClient create() throws SemanticException {
     if (baseUrls == null || baseUrls.length == 0) {
-      throw new SemanticException("baseUrls is not set.");
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("baseUrls is not set.",
+        ReplUtils.REPL_ATLAS_SERVICE));
     }
     setUGInfo();
     initializeAtlasApplicationProperties();
@@ -92,7 +95,8 @@ public class AtlasRestClientBuilder {
       props.setProperty(ATLAS_PROPERTY_AUTH_KERBEROS, "true");
       ApplicationProperties.set(ConfigurationConverter.getConfiguration(props));
     } catch (AtlasException e) {
-      throw new SemanticException(e);
+      throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format(e.getMessage(),
+        ReplUtils.REPL_ATLAS_SERVICE), e);
     }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
index 71e51fb..e4b294d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -29,7 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,7 +87,7 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
     }
   }
 
-  public InputStream exportData(AtlasExportRequest request) throws Exception {
+  public InputStream exportData(AtlasExportRequest request) throws SemanticException {
     LOG.debug("exportData: {}" + request);
     return invokeWithRetry(new Callable<InputStream>() {
       @Override
@@ -125,17 +127,15 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
     return new AtlasImportResult(request, "", "", "", 0L);
   }
 
-  public AtlasServer getServer(String endpoint) throws SemanticException {
+  public AtlasServer getServer(String endpoint, HiveConf conf) throws SemanticException {
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(conf)
+      .withRetryOnException(AtlasServiceException.class).build();
     try {
-      return clientV2.getServer(endpoint);
-    } catch (AtlasServiceException e) {
-      int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
-      if (statusCode != NOT_FOUND.getStatusCode()) {
-        throw new SemanticException("Exception while getServer ", e.getCause());
-      }
-      LOG.warn("getServer of: {} returned: {}", endpoint, e.getMessage());
+      return retryable.executeCallable(() -> clientV2.getServer(endpoint));
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
-    return null;
   }
 
   public String getEntityGuid(final String entityType,
@@ -149,12 +149,7 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
 
     try {
       AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = runWithTimeout(
-          new Callable<AtlasEntity.AtlasEntityWithExtInfo>() {
-            @Override
-            public AtlasEntity.AtlasEntityWithExtInfo call() throws Exception {
-              return clientV2.getEntityByAttribute(entityType, attributes);
-            }
-          }, entityApiTimeOut, TimeUnit.SECONDS);
+        () -> clientV2.getEntityByAttribute(entityType, attributes), entityApiTimeOut, TimeUnit.SECONDS);
 
       if (entityWithExtInfo == null || entityWithExtInfo.getEntity() == null) {
         LOG.warn("Atlas entity cannot be retrieved using: type: {} and {} - {}",
@@ -165,7 +160,8 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
     } catch (AtlasServiceException e) {
       int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
       if (statusCode != NOT_FOUND.getStatusCode()) {
-        throw new SemanticException("Exception while getEntityGuid ", e.getCause());
+        throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format("Exception " +
+          "while getEntityGuid ", ReplUtils.REPL_ATLAS_SERVICE), e.getCause());
       }
       LOG.warn("getEntityGuid: Could not retrieve entity guid for: {}-{}-{}",
               entityType, attributeName, qualifiedName, e.getMessage());
@@ -175,11 +171,14 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
     }
   }
 
-  public boolean getStatus() throws SemanticException {
+  public boolean getStatus(HiveConf conf) throws SemanticException {
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(conf)
+      .withRetryOnException(Exception.class).build();
     try {
-      return clientV2.isServerReady();
-    } catch (AtlasServiceException e) {
-      throw new SemanticException(e.getCause());
+      return retryable.executeCallable(() -> clientV2.isServerReady());
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
index 59dcbf7..d88fe64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.hadoop.hive.conf.HiveConf;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
@@ -42,7 +43,7 @@ public class NoOpAtlasRestClient implements AtlasRestClient {
     return new AtlasImportResult(request, "", "", "", 0L);
   }
 
-  public AtlasServer getServer(String endpoint) {
+  public AtlasServer getServer(String endpoint, HiveConf conf) {
     return new AtlasServer();
   }
 
@@ -51,7 +52,7 @@ public class NoOpAtlasRestClient implements AtlasRestClient {
     return UUID.randomUUID().toString();
   }
 
-  public boolean getStatus() {
+  public boolean getStatus(HiveConf conf) {
     return true;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
index 25471a4..6ddb114 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.repl.atlas;
 
 import com.sun.jersey.api.client.UniformInterfaceException;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +43,7 @@ public class RetryingClientTimeBased {
   protected double backOff;
   protected int maxJitterInSeconds;
 
-  protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {
+  protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws SemanticException {
     long startTime = System.currentTimeMillis();
     long delay = this.initialDelayInSeconds;
     while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
@@ -58,7 +60,7 @@ public class RetryingClientTimeBased {
           return null;
         }
         LOG.error(func.getClass().getName(), e);
-        throw new Exception(e);
+        throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e);
       }
     }
     return defaultReturnValue;
@@ -100,7 +102,7 @@ public class RetryingClientTimeBased {
             || e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP));
   }
 
-  private boolean processImportExportLockException(Exception e, long delay) throws Exception {
+  private boolean processImportExportLockException(Exception e, long delay) throws SemanticException {
     if (!(e instanceof AtlasServiceException)) {
       return false;
     }
@@ -111,7 +113,7 @@ public class RetryingClientTimeBased {
         Thread.sleep(delay);
       } catch (InterruptedException intEx) {
         LOG.error("Pause wait interrupted!", intEx);
-        throw new Exception(intEx);
+        throw new SemanticException(intEx);
       }
       return true;
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index adaaa02..87a2395 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.utils.Retry;
 import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -83,7 +85,8 @@ public class RangerRestClientImpl implements RangerRestClient {
                                                      HiveConf hiveConf)throws SemanticException {
     LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint);
     if (StringUtils.isEmpty(rangerHiveServiceName)) {
-      throw new SemanticException("Ranger Service Name cannot be empty");
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Ranger Service Name " +
+        "cannot be empty", ReplUtils.REPL_RANGER_SERVICE));
     }
     Retryable retryable = Retryable.builder()
       .withHiveConf(hiveConf)
@@ -92,7 +95,7 @@ public class RangerRestClientImpl implements RangerRestClient {
       return retryable.executeCallable(() -> exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName,
         dbName));
     } catch (Exception e) {
-      throw new SemanticException(e);
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 
@@ -361,7 +364,7 @@ public class RangerRestClientImpl implements RangerRestClient {
       return retryable.executeCallable(() -> writeExportedRangerPoliciesToJsonFile(jsonRangerExportPolicyList, fileName,
         stagingDirPath, conf));
     } catch (Exception e) {
-      throw new SemanticException(e);
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 
@@ -392,7 +395,7 @@ public class RangerRestClientImpl implements RangerRestClient {
     try {
       return retryable.executeCallable(() -> checkConnectionPlain(url));
     } catch (Exception e) {
-      throw new SemanticException(e);
+      throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 
@@ -408,7 +411,8 @@ public class RangerRestClientImpl implements RangerRestClient {
   public List<RangerPolicy> addDenyPolicies(List<RangerPolicy> rangerPolicies, String rangerServiceName,
                                             String sourceDb, String targetDb) throws SemanticException {
     if (StringUtils.isEmpty(rangerServiceName)) {
-      throw new SemanticException("Ranger Service Name cannot be empty");
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Ranger Service " +
+        "Name cannot be empty", ReplUtils.REPL_RANGER_SERVICE));
     }
     RangerPolicy denyRangerPolicy = new RangerPolicy();
     denyRangerPolicy.setService(rangerServiceName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index d531f5b..137cc29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -131,6 +131,15 @@ public class ReplUtils {
   public static final String RANGER_CONFIGURATION_RESOURCE_NAME = "ranger-hive-security.xml";
 
   public static final String TARGET_OF_REPLICATION = "repl.target.for";
+
+  // Service name for hive.
+  public static final String REPL_HIVE_SERVICE = "hive";
+
+  // Service name for ranger.
+  public static final String REPL_RANGER_SERVICE = "ranger";
+
+  // Service name for atlas.
+  public static final String REPL_ATLAS_SERVICE = "atlas";
   /**
    * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
    */
@@ -224,7 +233,8 @@ public class ReplUtils {
           throws SemanticException {
     String val = hiveConf.get(configParam);
     if (StringUtils.isEmpty(val)) {
-      throw new SemanticException(String.format(errorMsgFormat, configParam));
+      throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format(String.format(
+        errorMsgFormat, configParam), ReplUtils.REPL_ATLAS_SERVICE));
     }
     return val;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 3fb271d..c386aee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -27,7 +27,9 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -43,6 +45,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
 public class CopyUtils {
@@ -297,39 +300,27 @@ public class CopyUtils {
     return false;
   }
 
-  private UserGroupInformation getProxyUser() throws LoginException, IOException {
+  private UserGroupInformation getProxyUser() throws IOException {
     if (copyAsUser == null) {
       return null;
     }
-    UserGroupInformation proxyUser = null;
-    int currentRetry = 0;
-    while (currentRetry <= MAX_IO_RETRY) {
-      try {
+    Retryable retryable = Retryable.builder()
+      .withHiveConf(hiveConf)
+      .withRetryOnException(IOException.class).build();
+    try {
+      return retryable.executeCallable(() -> {
+        UserGroupInformation proxyUser = null;
         UserGroupInformation ugi = Utils.getUGI();
         String currentUser = ugi.getShortUserName();
         if (!currentUser.equals(copyAsUser)) {
           proxyUser = UserGroupInformation.createProxyUser(
-                  copyAsUser, UserGroupInformation.getLoginUser());
+            copyAsUser, UserGroupInformation.getLoginUser());
         }
         return proxyUser;
-      } catch (IOException e) {
-        currentRetry++;
-        if (currentRetry <= MAX_IO_RETRY) {
-          LOG.warn("Unable to get UGI info", e);
-        } else {
-          LOG.error("Unable to get UGI info", e);
-          throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
-        }
-        int sleepTime = FileUtils.getSleepTime(currentRetry);
-        LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
-        try {
-          Thread.sleep(sleepTime);
-        } catch (InterruptedException timerEx) {
-          LOG.info("Sleep interrupted", timerEx.getMessage());
-        }
-      }
+      });
+    } catch (Exception e) {
+      throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
-    return null;
   }
 
   // Copy without retry
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
index dbbbf2c..c51618b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
 import com.google.gson.Gson;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
 import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy;
 import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
@@ -79,7 +80,7 @@ public class TestRangerLoadTask {
   @Test
   public void testFailureInvalidAuthProviderEndpoint() {
     int status = task.execute();
-    Assert.assertEquals(40000, status);
+    Assert.assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), status);
   }
 
   @Test
diff --git a/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out b/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out
index bd8892a..54b8a3f 100644
--- a/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out
+++ b/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out
@@ -11,4 +11,4 @@ PREHOOK: type: ALTERTABLE_LOCATION
 PREHOOK: Input: default@testwrongloc
 PREHOOK: Output: default@testwrongloc
 #### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.ddl.DDLTask. {0}  is not absolute.  Please specify a complete absolute uri. relative/testwrongloc
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.ddl.DDLTask. relative/testwrongloc  is not absolute.  Please specify a complete absolute uri.