You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/08/10 05:41:08 UTC
[hive] branch master updated: HIVE-23955:Classification of Error
Codes in Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new aebb74e HIVE-23955:Classification of Error Codes in Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
aebb74e is described below
commit aebb74ea361b5018ffde96ae7ddfb4ee899d5329
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Aug 10 11:10:56 2020 +0530
HIVE-23955:Classification of Error Codes in Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
.../org/apache/hadoop/hive/common/FileUtils.java | 1 -
.../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 38 +++---
.../parse/TestReplicationScenariosAcidTables.java | 2 +
.../TestReplicationScenariosAcrossInstances.java | 31 ++++-
.../parse/TestTableLevelReplicationScenarios.java | 2 +
.../java/org/apache/hive/jdbc/TestJdbcDriver2.java | 7 -
.../hadoop/hive/ql/exec/repl/AtlasDumpTask.java | 60 +++++----
.../hadoop/hive/ql/exec/repl/AtlasLoadTask.java | 40 +++---
.../hadoop/hive/ql/exec/repl/DirCopyTask.java | 143 +++++++--------------
.../hadoop/hive/ql/exec/repl/RangerDumpTask.java | 16 ++-
.../hadoop/hive/ql/exec/repl/RangerLoadTask.java | 16 ++-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 85 ++++++------
.../hive/ql/exec/repl/ReplExternalTables.java | 82 ++++++------
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 5 +-
.../hive/ql/exec/repl/atlas/AtlasRestClient.java | 7 +-
.../ql/exec/repl/atlas/AtlasRestClientBuilder.java | 8 +-
.../ql/exec/repl/atlas/AtlasRestClientImpl.java | 41 +++---
.../ql/exec/repl/atlas/NoOpAtlasRestClient.java | 5 +-
.../exec/repl/atlas/RetryingClientTimeBased.java | 10 +-
.../ql/exec/repl/ranger/RangerRestClientImpl.java | 14 +-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 12 +-
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 37 ++----
.../hive/ql/exec/repl/TestRangerLoadTask.java | 3 +-
.../alter_table_wrong_location2.q.out | 2 +-
24 files changed, 345 insertions(+), 322 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 142d779..24657b1 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory;
public final class FileUtils {
private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class.getName());
private static final Random random = new Random();
- public static final int MAX_IO_ERROR_RETRY = 5;
public static final int IO_ERROR_SLEEP_TIME = 100;
public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index d943412..8c78da9 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -331,8 +331,8 @@ public enum ErrorMsg {
TABLE_NOT_PARTITIONED(10241, "Table {0} is not a partitioned table", true),
DATABASE_ALREADY_EXISTS(10242, "Database {0} already exists", true),
CANNOT_REPLACE_COLUMNS(10243, "Replace columns is not supported for table {0}. SerDe may be incompatible.", true),
- BAD_LOCATION_VALUE(10244, "{0} is not absolute. Please specify a complete absolute uri."),
- UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"),
+ BAD_LOCATION_VALUE(10244, "{0} is not absolute. Please specify a complete absolute uri.", true),
+ UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported", true),
INVALID_BIGTABLE_MAPJOIN(10246, "{0} table chosen for streaming is not valid", true),
MISSING_OVER_CLAUSE(10247, "Missing over clause for function : "),
PARTITION_SPEC_TYPE_MISMATCH(10248, "Cannot add partition column {0} of type {1} as it cannot be converted to type {2}", true),
@@ -505,18 +505,8 @@ public enum ErrorMsg {
" queue: {1}. Please fix and try again.", true),
SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."),
- //if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification
- //method in HiveMetaStoreClient
- REPL_EVENTS_MISSING_IN_METASTORE(20016, "Notification events are missing in the meta store."),
- REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(20017, "Load path {0} not valid as target database is bootstrapped " +
- "from some other path : {1}."),
- REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20018, "File is missing from both source and cm path."),
- REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."),
- REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020,
- "Source of replication (repl.source.for) is not set in the database properties."),
- REPL_INVALID_DB_OR_TABLE_PATTERN(20021,
- "Invalid pattern for the DB or table name in the replication policy. "
- + "It should be a valid regex enclosed within single or double quotes."),
+ REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20016, "File is missing from both source and cm path."),
+ REPL_EXTERNAL_SERVICE_CONNECTION_ERROR(20017, "Failed to connect to {0} service. Error code {1}.",true),
// An exception from runtime that will show the full stack to client
UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true),
@@ -605,9 +595,10 @@ public enum ErrorMsg {
SPARK_JOB_INTERRUPTED(30044, "Spark job was interrupted while executing"),
SPARK_GET_JOB_INFO_INTERRUPTED(30045, "Spark job was interrupted while getting job info"),
- SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"),
+ SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}", true),
- REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."),
+ REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired. Error {0}",
+ true),
SPARK_GET_STAGES_INFO_TIMEOUT(30048, "Spark job GetSparkStagesInfoJob timed out after {0} seconds.", true),
SPARK_GET_STAGES_INFO_INTERRUPTED(30049, "Spark job GetSparkStagesInfoJob was interrupted."),
SPARK_GET_STAGES_INFO_EXECUTIONERROR(30050, "Spark job GetSparkStagesInfoJob failed in execution while getting job info due to exception {0}", true),
@@ -616,7 +607,20 @@ public enum ErrorMsg {
SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true),
SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true),
- REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.")
+ REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication."),
+ REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(40004,
+ "Source of replication (repl.source.for) is not set in the database properties."),
+ REPL_INVALID_DB_OR_TABLE_PATTERN(40005,
+ "Invalid pattern for the DB or table name in the replication policy. "
+ + "It should be a valid regex enclosed within single or double quotes."),
+ //if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification
+ //method in HiveMetaStoreClient
+ REPL_EVENTS_MISSING_IN_METASTORE(40006, "Notification events are missing in the meta store."),
+ REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(40007, "Load path {0} not valid as target database is bootstrapped " +
+ "from some other path : {1}.", true),
+ REPL_INVALID_CONFIG_FOR_SERVICE(40008, "Invalid config error : {0} for {1} service.", true),
+ REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE(40009, "Invalid internal config error : {0} for {1} service.", true),
+ REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", true)
;
private int errorCode;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index cd48d4b..b1c02e5 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -1957,6 +1957,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
replica.dump(replicatedDbName);
} catch (Exception e) {
Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage());
+ Assert.assertEquals(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getErrorCode(),
+ ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')");
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index b4666ba..d735c9b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.util.DependencyResolver;
@@ -1600,6 +1602,27 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.verifyResults(new String[] {"1", "2"});
}
+ @Test
+ public void testRangerReplicationRetryExhausted() throws Throwable {
+ List<String> clause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "'='true'",
+ "'" + HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY + "'='1s'", "'" + HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION
+ + "'='30s'", "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL + "'='false'", "'" + HiveConf.ConfVars.HIVE_IN_TEST
+ + "'='false'");
+ try {
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by (load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .dump(primaryDbName, clause);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(ErrorMsg.REPL_RETRY_EXHAUSTED.getErrorCode(),
+ ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
+ }
+ }
+
/*
Can't test complete replication as mini ranger is not supported
Testing just the configs and no impact on existing replication
@@ -1617,7 +1640,10 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
try {
primary.dump(primaryDbName, clause);
} catch (SemanticException e) {
- assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
+ assertEquals("Invalid config error : Authorizer sentry not supported for replication " +
+ "for ranger service.", e.getMessage());
+ assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(),
+ ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
}
}
@@ -1732,7 +1758,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
}
Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail.");
} catch (SemanticException e) {
- assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas metadata replication"));
+ assertEquals(e.getMessage(), ("Invalid config error : " + conf
+ + " is mandatory config for Atlas metadata replication for atlas service."));
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 3bc63a3..bd2a439 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -415,6 +415,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
LOG.info("Got exception: {}", ex.getMessage());
Assert.assertTrue(ex instanceof SemanticException);
Assert.assertTrue(ex.getMessage().equals(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg()));
+ Assert.assertEquals(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getErrorCode(),
+ ErrorMsg.getErrorMsg(ex.getMessage()).getErrorCode());
failed = true;
}
Assert.assertTrue(failed);
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index fb38b8c..f4e7c4f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -3120,13 +3120,6 @@ public class TestJdbcDriver2 {
assertTrue(e.getErrorCode() == ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getErrorCode());
}
- try {
- // invalid load path
- stmt.execute("repl load default into default1");
- } catch(SQLException e){
- assertTrue(e.getErrorCode() == ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getErrorCode());
- }
-
stmt.close();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index e6384af..8020fa4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -46,6 +47,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.FileNotFoundException;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
@@ -132,31 +134,43 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
private long lastStoredTimeStamp() throws SemanticException {
Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
- BufferedReader br = null;
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class)
+ .withFailOnException(FileNotFoundException.class).build();
try {
- FileSystem fs = prevMetadataPath.getFileSystem(conf);
- br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
- String line = br.readLine();
- if (line == null) {
- throw new SemanticException("Could not read lastStoredTimeStamp from atlas metadata file");
- }
- String[] lineContents = line.split("\t", 5);
- return Long.parseLong(lineContents[1]);
- } catch (Exception ex) {
- throw new SemanticException(ex);
- } finally {
- if (br != null) {
+ return retryable.executeCallable(() -> {
+ BufferedReader br = null;
try {
- br.close();
- } catch (IOException e) {
- throw new SemanticException(e);
+ FileSystem fs = prevMetadataPath.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+ String line = br.readLine();
+ if (line == null) {
+ throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE
+ .format("Could not read lastStoredTimeStamp from atlas metadata file",
+ ReplUtils.REPL_ATLAS_SERVICE));
+ }
+ String[] lineContents = line.split("\t", 5);
+ return Long.parseLong(lineContents[1]);
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ //Do nothing
+ }
+ }
}
- }
+ });
+ } catch (SemanticException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
- AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+ AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster(), conf);
long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
LOG.debug("Current timestamp is: {}", ret);
@@ -177,13 +191,13 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
} catch (SemanticException ex) {
throw ex;
} catch (Exception ex) {
- throw new SemanticException(ex);
+ throw new SemanticException(ex.getMessage(), ex);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
- throw new SemanticException(e);
+ //Do nothing
}
}
}
@@ -196,12 +210,14 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb);
Set<Map.Entry<String, Object>> entries = objectId.getUniqueAttributes().entrySet();
if (entries == null || entries.isEmpty()) {
- throw new SemanticException("Could find entries in objectId for:" + clusterName);
+ throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format("Could find " +
+ "entries in objectId for:" + clusterName, ReplUtils.REPL_ATLAS_SERVICE));
}
Map.Entry<String, Object> item = entries.iterator().next();
String guid = atlasRestClient.getEntityGuid(objectId.getTypeName(), item.getKey(), (String) item.getValue());
if (guid == null || guid.isEmpty()) {
- throw new SemanticException("Entity not found:" + objectId);
+ throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE
+ .format("Entity not found:" + objectId, ReplUtils.REPL_ATLAS_SERVICE));
}
return guid;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index b24b3d6..1506db8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger;
@@ -48,6 +49,7 @@ import java.net.URL;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
/**
* Atlas Metadata Replication Load Task.
@@ -113,26 +115,30 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
- BufferedReader br = null;
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
try {
- FileSystem fs = metadataPath.getFileSystem(conf);
- br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
- String line = br.readLine();
- if (line == null) {
- throw new SemanticException("Could not read stored src FS Uri from atlas metadata file");
- }
- String[] lineContents = line.split("\t", 5);
- return lineContents[0];
- } catch (Exception ex) {
- throw new SemanticException(ex);
- } finally {
- if (br != null) {
+ return retryable.executeCallable(() -> {
+ BufferedReader br = null;
try {
- br.close();
- } catch (IOException e) {
- throw new SemanticException(e);
+ FileSystem fs = metadataPath.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
+ String line = br.readLine();
+ if (line == null) {
+ throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format("Could not read stored " +
+ "src FS Uri from atlas metadata file", ReplUtils.REPL_ATLAS_SERVICE));
+ }
+ String[] lineContents = line.split("\t", 5);
+ return lineContents[0];
+ } finally {
+ if (br != null) {
+ br.close();
+ }
}
- }
+ });
+ } catch (Exception e) {
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
index e8a8df1..5ed09f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -43,7 +44,6 @@ import java.util.Collections;
*/
public class DirCopyTask extends Task<DirCopyWork> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
- private static final int MAX_COPY_RETRY = 5;
private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException {
FileSystem targetFs = destPath.getFileSystem(conf);
@@ -51,7 +51,8 @@ public class DirCopyTask extends Task<DirCopyWork> implements Serializable {
if (!targetFs.exists(destPath)) {
// target path is created even if the source path is missing, so that ddl task does not try to create it.
if (!targetFs.mkdirs(destPath)) {
- throw new IOException(destPath + " is not a directory or unable to create one");
+ throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.format(
+ destPath + " is not a directory or unable to create one"));
}
createdDir = true;
}
@@ -86,107 +87,61 @@ public class DirCopyTask extends Task<DirCopyWork> implements Serializable {
return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> sourcePath.getFileSystem(conf).exists(sourcePath));
}
- private int handleException(Exception e, Path sourcePath, Path targetPath,
- int currentRetry, UserGroupInformation proxyUser) {
- try {
- LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e);
- if (!checkIfPathExist(sourcePath, proxyUser)) {
- LOG.info("Source path is missing. Ignoring exception.");
- return 0;
- }
- } catch (Exception ex) {
- LOG.warn("Source path missing check failed. ", ex);
- }
- // retry logic only for i/o exception
- if (!(e instanceof IOException)) {
- LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e);
- setException(e);
- return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
- }
-
- if (currentRetry <= MAX_COPY_RETRY) {
- LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e);
- } else {
- LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e);
- setException(e);
- return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode();
- }
- int sleepTime = FileUtils.getSleepTime(currentRetry);
- LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry));
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException timerEx) {
- LOG.info("Sleep interrupted", timerEx.getMessage());
- }
- try {
- if (proxyUser == null) {
- proxyUser = Utils.getUGI();
- }
- FileSystem.closeAllForUGI(proxyUser);
- } catch (Exception ex) {
- LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex);
- }
- return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
- }
-
@Override
public int execute() {
String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
+ try {
+ return retryable.executeCallable(() -> {
+ UserGroupInformation proxyUser = null;
+ Path sourcePath = work.getFullyQualifiedSourcePath();
+ Path targetPath = work.getFullyQualifiedTargetPath();
+ try {
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+ sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri());
+ targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri());
+ }
+ UserGroupInformation ugi = Utils.getUGI();
+ String currentUser = ugi.getShortUserName();
+ if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
+ proxyUser = UserGroupInformation.createProxyUser(
+ distCpDoAsUser, UserGroupInformation.getLoginUser());
+ }
- Path sourcePath = work.getFullyQualifiedSourcePath();
- Path targetPath = work.getFullyQualifiedTargetPath();
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
- sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri());
- targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri());
- }
- int currentRetry = 0;
- int error = 0;
- UserGroupInformation proxyUser = null;
- while (currentRetry <= MAX_COPY_RETRY) {
- try {
- UserGroupInformation ugi = Utils.getUGI();
- String currentUser = ugi.getShortUserName();
- if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
- proxyUser = UserGroupInformation.createProxyUser(
- distCpDoAsUser, UserGroupInformation.getLoginUser());
- }
-
- setTargetPathOwner(targetPath, sourcePath, proxyUser);
-
- // do we create a new conf and only here provide this additional option so that we get away from
- // differences of data in two location for the same directories ?
- // basically add distcp.options.delete to hiveconf new object ?
- FileUtils.distCp(
- sourcePath.getFileSystem(conf), // source file system
- Collections.singletonList(sourcePath), // list of source paths
- targetPath,
- false,
- proxyUser,
- conf,
- ShimLoader.getHadoopShims());
- return 0;
- } catch (Exception e) {
- currentRetry++;
- error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser);
- if (error == 0) {
- return 0;
- }
- } finally {
- if (proxyUser != null) {
+ setTargetPathOwner(targetPath, sourcePath, proxyUser);
try {
- FileSystem.closeAllForUGI(proxyUser);
- } catch (IOException e) {
- LOG.error("Unable to closeAllForUGI for user " + proxyUser, e);
- if (error == 0) {
- setException(e);
- error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ if (!checkIfPathExist(sourcePath, proxyUser)) {
+ LOG.info("Source path is missing. Ignoring exception.");
+ return 0;
}
- break;
+ } catch (Exception ex) {
+ LOG.warn("Source path missing check failed. ", ex);
+ //Should be retried
+ throw new IOException(ex);
+ }
+ // do we create a new conf and only here provide this additional option so that we get away from
+ // differences of data in two location for the same directories ?
+ // basically add distcp.options.delete to hiveconf new object ?
+ FileUtils.distCp(
+ sourcePath.getFileSystem(conf), // source file system
+ Collections.singletonList(sourcePath), // list of source paths
+ targetPath,
+ false,
+ proxyUser,
+ conf,
+ ShimLoader.getHadoopShims());
+ return 0;
+ } finally {
+ if (proxyUser != null) {
+ FileSystem.closeAllForUGI(proxyUser);
}
}
- }
+ });
+ } catch (Exception e) {
+ throw new SecurityException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
- return error;
}
private static Path reservedRawPath(URI uri) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
index 4c1a170..0576ecc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
@@ -92,14 +92,22 @@ public class RangerDumpTask extends Task<RangerDumpWork> implements Serializable
}
URL url = work.getRangerConfigResource();
if (url == null) {
- throw new SemanticException("Ranger configuration is not valid "
- + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME);
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+ .format("Ranger configuration is not valid "
+ + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME, ReplUtils.REPL_RANGER_SERVICE));
}
conf.addResource(url);
String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
- if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
- throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
+ if (StringUtils.isEmpty(rangerEndpoint)) {
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+ .format("Ranger endpoint is not valid "
+ + rangerEndpoint, ReplUtils.REPL_RANGER_SERVICE));
+ }
+ if (!rangerRestClient.checkConnection(rangerEndpoint, conf)) {
+ throw new SemanticException(ErrorMsg.REPL_EXTERNAL_SERVICE_CONNECTION_ERROR.format(ReplUtils
+ .REPL_RANGER_SERVICE,
+ "Ranger endpoint is not reachable " + rangerEndpoint));
}
RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint,
work.getDbName(), rangerHiveServiceName, conf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
index 3c7e9e24..240caf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
@@ -89,14 +89,22 @@ public class RangerLoadTask extends Task<RangerLoadWork> implements Serializable
}
URL url = work.getRangerConfigResource();
if (url == null) {
- throw new SemanticException("Ranger configuration is not valid "
- + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME);
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+ .format("Ranger configuration is not valid "
+ + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME, ReplUtils.REPL_RANGER_SERVICE));
}
conf.addResource(url);
String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
- if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
- throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
+ if (StringUtils.isEmpty(rangerEndpoint)) {
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+ .format("Ranger endpoint is not valid "
+ + rangerEndpoint, ReplUtils.REPL_RANGER_SERVICE));
+ }
+ if (!rangerRestClient.checkConnection(rangerEndpoint, conf)) {
+ throw new SemanticException(ErrorMsg.REPL_EXTERNAL_SERVICE_CONNECTION_ERROR.format(ReplUtils
+ .REPL_RANGER_SERVICE,
+ "Ranger endpoint is not valid " + rangerEndpoint));
}
if (work.getCurrentDumpPath() != null) {
LOG.info("Importing Ranger Metadata from {} ", work.getCurrentDumpPath());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index dc8d790..214c12d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.Constants;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
-import org.apache.hadoop.hive.metastore.utils.Retry;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -87,12 +85,10 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.io.IOUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.login.LoginException;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
@@ -218,8 +214,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
childTasks.add(rangerDumpTask);
} else {
- throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
- + " not supported for replication ");
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer "
+ + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
+ + " not supported for replication ", ReplUtils.REPL_RANGER_SERVICE));
}
}
@@ -647,6 +644,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
return currentEventMaxLimit;
}
+
private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws SemanticException {
Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1));
Retryable retryable = Retryable.builder()
@@ -668,22 +666,29 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
private long getResumeFrom(Path ackFile) throws SemanticException {
- BufferedReader br = null;
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(Exception.class).build();
try {
- FileSystem fs = ackFile.getFileSystem(conf);
- br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset()));
- long lastEventID = Long.parseLong(br.readLine());
- return lastEventID;
- } catch (Exception ex) {
- throw new SemanticException(ex);
- } finally {
- if (br != null) {
+ return retryable.executeCallable(() -> {
+ BufferedReader br = null;
try {
- br.close();
- } catch (IOException e) {
- throw new SemanticException(e);
+ FileSystem fs = ackFile.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset()));
+ long lastEventID = Long.parseLong(br.readLine());
+ return lastEventID;
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (Exception e) {
+ //Do nothing
+ }
+ }
}
- }
+ });
+ } catch (Exception e) {
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
@@ -729,7 +734,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
private void dumpTableListToDumpLocation(List<String> tableList, Path dbRoot, String dbName,
- HiveConf hiveConf) throws IOException, LoginException {
+ HiveConf hiveConf) throws Exception {
// Empty list will create an empty file to distinguish it from db level replication. If no file is there, that means
// db level replication. If empty file is there, means no table satisfies the policy.
if (tableList == null) {
@@ -738,12 +743,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
// The table list is dumped in _tables/dbname file
- Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME);
- tableListFile = new Path(tableListFile, dbName.toLowerCase());
-
- int count = 0;
- while (count < FileUtils.MAX_IO_ERROR_RETRY) {
- try (FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile)) {
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
+ try {
+ retryable.executeCallable((Callable<Void>) () -> {
+ Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME);
+ tableListFile = new Path(tableListFile, dbName.toLowerCase());
+ FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile);
for (String tableName : tableList) {
String line = tableName.toLowerCase().concat("\n");
writer.write(line.getBytes(StandardCharsets.UTF_8));
@@ -751,27 +758,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// Close is called explicitly as close also calls the actual file system write,
// so there is chance of i/o exception thrown by close.
writer.close();
- break;
- } catch (IOException e) {
- LOG.info("File operation failed", e);
- if (count >= (FileUtils.MAX_IO_ERROR_RETRY - 1)) {
- //no need to wait in the last iteration
- LOG.error("File " + tableListFile.toUri() + " creation failed even after " +
- FileUtils.MAX_IO_ERROR_RETRY + " attempts.");
- throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
- }
- int sleepTime = FileUtils.getSleepTime(count);
- LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (count+1));
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException timerEx) {
- LOG.info("Sleep interrupted", timerEx.getMessage());
- }
- FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI());
- }
- count++;
+ LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList);
+ return null;
+ });
+ } catch (Exception e) {
+ FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI());
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
- LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList);
}
Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
index b6e0858..796cfae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -48,6 +50,7 @@ import java.util.Base64;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
/**
* Format of the file used to dump information about external tables:
@@ -62,7 +65,6 @@ public final class ReplExternalTables {
private static final Logger LOG = LoggerFactory.getLogger(ReplExternalTables.class);
private static final String FIELD_SEPARATOR = ",";
public static final String FILE_NAME = "_external_tables_info";
- private static final int MAX_RETRIES = 5;
private ReplExternalTables(){}
@@ -79,9 +81,9 @@ public final class ReplExternalTables {
String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
URI baseDirUri = StringUtils.isEmpty(baseDir) ? null : new Path(baseDir).toUri();
if (baseDirUri == null || baseDirUri.getScheme() == null || baseDirUri.getAuthority() == null) {
- throw new SemanticException(
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format(
String.format("Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required %s",
- baseDir == null ? "" : "- ('" + baseDir + "')"));
+ baseDir == null ? "" : "- ('" + baseDir + "')"), ReplUtils.REPL_HIVE_SERVICE));
}
return new Path(baseDirUri);
}
@@ -102,7 +104,8 @@ public final class ReplExternalTables {
basePath.getFileSystem(hiveConf)
);
} catch (IOException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format(
+ ErrorMsg.INVALID_PATH.getMsg(), ReplUtils.REPL_HIVE_SERVICE), e);
}
return dataPath;
}
@@ -196,23 +199,22 @@ public final class ReplExternalTables {
return lineToWrite.toString();
}
- private void write(String line) throws InterruptedException {
- int currentRetry = 0;
- while (currentRetry < MAX_RETRIES) {
- try {
- writer.write(line.getBytes(StandardCharsets.UTF_8));
- break;
- } catch (IOException e) {
- currentRetry++;
- if (currentRetry < MAX_RETRIES) {
- LOG.warn("failed to write data with maxRetries {} due to", currentRetry, e);
- } else {
- LOG.error("failed to write data with maxRetries {} due to", currentRetry, e);
- throw new RuntimeException("failed to write data", e);
+ private void write(String line) throws SemanticException {
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(hiveConf)
+ .withRetryOnException(IOException.class).build();
+ try {
+ retryable.executeCallable((Callable<Void>) () -> {
+ try {
+ writer.write(line.getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ writer = openWriterAppendMode();
+ throw e;
}
- Thread.sleep(100 * currentRetry * currentRetry);
- writer = openWriterAppendMode();
- }
+ return null;
+ });
+ } catch (Exception e) {
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
@@ -288,33 +290,27 @@ public final class ReplExternalTables {
if (!fileSystem.exists(externalTableInfo)) {
return locationsToCopy;
}
-
- int currentRetry = 0;
- BufferedReader reader = null;
- while (currentRetry < MAX_RETRIES) {
- try {
- reader = reader(fileSystem, externalTableInfo);
- for (String line = reader.readLine(); line != null; line = reader.readLine()) {
- String[] splits = line.split(FIELD_SEPARATOR);
- locationsToCopy
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(hiveConf)
+ .withRetryOnException(IOException.class).build();
+ try {
+ return retryable.executeCallable(() -> {
+ BufferedReader reader = null;
+ try {
+ reader = reader(fileSystem, externalTableInfo);
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ String[] splits = line.split(FIELD_SEPARATOR);
+ locationsToCopy
.add(new String(Base64.getDecoder().decode(splits[1]), StandardCharsets.UTF_8));
- }
- return locationsToCopy;
- } catch (IOException e) {
- currentRetry++;
- if (currentRetry < MAX_RETRIES) {
+ }
+ return locationsToCopy;
+ } finally {
closeQuietly(reader);
- LOG.warn("failed to read {}", externalTableInfo.toString(), e);
- } else {
- LOG.error("failed to read {}", externalTableInfo.toString(), e);
- throw e;
}
- } finally {
- closeQuietly(reader);
- }
+ });
+ } catch (Exception e) {
+ throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
- // we should never reach here
- throw new IllegalStateException("we should never reach this condition");
}
private static void closeQuietly(BufferedReader reader) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 3c3dc44..8029b72 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -161,8 +161,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
childTasks.add(rangerLoadTask);
} else {
- throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
- + " not supported for replication ");
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer " +
+ conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
+ + " not supported for replication ", ReplUtils.REPL_RANGER_SERVICE));
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
index dd72f83..e49664d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import java.io.InputStream;
@@ -30,14 +31,14 @@ import java.io.InputStream;
*/
public interface AtlasRestClient {
- InputStream exportData(AtlasExportRequest request) throws Exception;
+ InputStream exportData(AtlasExportRequest request) throws SemanticException;
AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception;
- AtlasServer getServer(String endpoint) throws SemanticException;
+ AtlasServer getServer(String endpoint, HiveConf conf) throws SemanticException;
String getEntityGuid(final String entityType, final String attributeName, final String qualifiedName)
throws SemanticException;
- boolean getStatus() throws SemanticException;
+ boolean getStatus(HiveConf conf) throws SemanticException;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
index 6f24f57..6c609aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
@@ -23,6 +23,8 @@ import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -64,7 +66,8 @@ public class AtlasRestClientBuilder {
private AtlasRestClient create() throws SemanticException {
if (baseUrls == null || baseUrls.length == 0) {
- throw new SemanticException("baseUrls is not set.");
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("baseUrls is not set.",
+ ReplUtils.REPL_ATLAS_SERVICE));
}
setUGInfo();
initializeAtlasApplicationProperties();
@@ -92,7 +95,8 @@ public class AtlasRestClientBuilder {
props.setProperty(ATLAS_PROPERTY_AUTH_KERBEROS, "true");
ApplicationProperties.set(ConfigurationConverter.getConfiguration(props));
} catch (AtlasException e) {
- throw new SemanticException(e);
+ throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format(e.getMessage(),
+ ReplUtils.REPL_ATLAS_SERVICE), e);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
index 71e51fb..e4b294d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -29,7 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +87,7 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
}
}
- public InputStream exportData(AtlasExportRequest request) throws Exception {
+ public InputStream exportData(AtlasExportRequest request) throws SemanticException {
LOG.debug("exportData: {}" + request);
return invokeWithRetry(new Callable<InputStream>() {
@Override
@@ -125,17 +127,15 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
return new AtlasImportResult(request, "", "", "", 0L);
}
- public AtlasServer getServer(String endpoint) throws SemanticException {
+ public AtlasServer getServer(String endpoint, HiveConf conf) throws SemanticException {
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(AtlasServiceException.class).build();
try {
- return clientV2.getServer(endpoint);
- } catch (AtlasServiceException e) {
- int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
- if (statusCode != NOT_FOUND.getStatusCode()) {
- throw new SemanticException("Exception while getServer ", e.getCause());
- }
- LOG.warn("getServer of: {} returned: {}", endpoint, e.getMessage());
+ return retryable.executeCallable(() -> clientV2.getServer(endpoint));
+ } catch (Exception e) {
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
- return null;
}
public String getEntityGuid(final String entityType,
@@ -149,12 +149,7 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
try {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = runWithTimeout(
- new Callable<AtlasEntity.AtlasEntityWithExtInfo>() {
- @Override
- public AtlasEntity.AtlasEntityWithExtInfo call() throws Exception {
- return clientV2.getEntityByAttribute(entityType, attributes);
- }
- }, entityApiTimeOut, TimeUnit.SECONDS);
+ () -> clientV2.getEntityByAttribute(entityType, attributes), entityApiTimeOut, TimeUnit.SECONDS);
if (entityWithExtInfo == null || entityWithExtInfo.getEntity() == null) {
LOG.warn("Atlas entity cannot be retrieved using: type: {} and {} - {}",
@@ -165,7 +160,8 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
} catch (AtlasServiceException e) {
int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
if (statusCode != NOT_FOUND.getStatusCode()) {
- throw new SemanticException("Exception while getEntityGuid ", e.getCause());
+ throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format("Exception " +
+ "while getEntityGuid ", ReplUtils.REPL_ATLAS_SERVICE), e.getCause());
}
LOG.warn("getEntityGuid: Could not retrieve entity guid for: {}-{}-{}",
entityType, attributeName, qualifiedName, e.getMessage());
@@ -175,11 +171,14 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla
}
}
- public boolean getStatus() throws SemanticException {
+ public boolean getStatus(HiveConf conf) throws SemanticException {
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(Exception.class).build();
try {
- return clientV2.isServerReady();
- } catch (AtlasServiceException e) {
- throw new SemanticException(e.getCause());
+ return retryable.executeCallable(() -> clientV2.isServerReady());
+ } catch (Exception e) {
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
index 59dcbf7..d88fe64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.hadoop.hive.conf.HiveConf;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@@ -42,7 +43,7 @@ public class NoOpAtlasRestClient implements AtlasRestClient {
return new AtlasImportResult(request, "", "", "", 0L);
}
- public AtlasServer getServer(String endpoint) {
+ public AtlasServer getServer(String endpoint, HiveConf conf) {
return new AtlasServer();
}
@@ -51,7 +52,7 @@ public class NoOpAtlasRestClient implements AtlasRestClient {
return UUID.randomUUID().toString();
}
- public boolean getStatus() {
+ public boolean getStatus(HiveConf conf) {
return true;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
index 25471a4..6ddb114 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.repl.atlas;
import com.sun.jersey.api.client.UniformInterfaceException;
import org.apache.atlas.AtlasServiceException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +43,7 @@ public class RetryingClientTimeBased {
protected double backOff;
protected int maxJitterInSeconds;
- protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {
+ protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws SemanticException {
long startTime = System.currentTimeMillis();
long delay = this.initialDelayInSeconds;
while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
@@ -58,7 +60,7 @@ public class RetryingClientTimeBased {
return null;
}
LOG.error(func.getClass().getName(), e);
- throw new Exception(e);
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e);
}
}
return defaultReturnValue;
@@ -100,7 +102,7 @@ public class RetryingClientTimeBased {
|| e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP));
}
- private boolean processImportExportLockException(Exception e, long delay) throws Exception {
+ private boolean processImportExportLockException(Exception e, long delay) throws SemanticException {
if (!(e instanceof AtlasServiceException)) {
return false;
}
@@ -111,7 +113,7 @@ public class RetryingClientTimeBased {
Thread.sleep(delay);
} catch (InterruptedException intEx) {
LOG.error("Pause wait interrupted!", intEx);
- throw new Exception(intEx);
+ throw new SemanticException(intEx);
}
return true;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index adaaa02..87a2395 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.Retry;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -83,7 +85,8 @@ public class RangerRestClientImpl implements RangerRestClient {
HiveConf hiveConf)throws SemanticException {
LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint);
if (StringUtils.isEmpty(rangerHiveServiceName)) {
- throw new SemanticException("Ranger Service Name cannot be empty");
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Ranger Service Name " +
+ "cannot be empty", ReplUtils.REPL_RANGER_SERVICE));
}
Retryable retryable = Retryable.builder()
.withHiveConf(hiveConf)
@@ -92,7 +95,7 @@ public class RangerRestClientImpl implements RangerRestClient {
return retryable.executeCallable(() -> exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName,
dbName));
} catch (Exception e) {
- throw new SemanticException(e);
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
@@ -361,7 +364,7 @@ public class RangerRestClientImpl implements RangerRestClient {
return retryable.executeCallable(() -> writeExportedRangerPoliciesToJsonFile(jsonRangerExportPolicyList, fileName,
stagingDirPath, conf));
} catch (Exception e) {
- throw new SemanticException(e);
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
@@ -392,7 +395,7 @@ public class RangerRestClientImpl implements RangerRestClient {
try {
return retryable.executeCallable(() -> checkConnectionPlain(url));
} catch (Exception e) {
- throw new SemanticException(e);
+ throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
@@ -408,7 +411,8 @@ public class RangerRestClientImpl implements RangerRestClient {
public List<RangerPolicy> addDenyPolicies(List<RangerPolicy> rangerPolicies, String rangerServiceName,
String sourceDb, String targetDb) throws SemanticException {
if (StringUtils.isEmpty(rangerServiceName)) {
- throw new SemanticException("Ranger Service Name cannot be empty");
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Ranger Service " +
+ "Name cannot be empty", ReplUtils.REPL_RANGER_SERVICE));
}
RangerPolicy denyRangerPolicy = new RangerPolicy();
denyRangerPolicy.setService(rangerServiceName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index d531f5b..137cc29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -131,6 +131,15 @@ public class ReplUtils {
public static final String RANGER_CONFIGURATION_RESOURCE_NAME = "ranger-hive-security.xml";
public static final String TARGET_OF_REPLICATION = "repl.target.for";
+
+ // Service name for hive.
+ public static final String REPL_HIVE_SERVICE = "hive";
+
+ // Service name for ranger.
+ public static final String REPL_RANGER_SERVICE = "ranger";
+
+ // Service name for atlas.
+ public static final String REPL_ATLAS_SERVICE = "atlas";
/**
* Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
*/
@@ -224,7 +233,8 @@ public class ReplUtils {
throws SemanticException {
String val = hiveConf.get(configParam);
if (StringUtils.isEmpty(val)) {
- throw new SemanticException(String.format(errorMsgFormat, configParam));
+ throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format(String.format(
+ errorMsgFormat, configParam), ReplUtils.REPL_ATLAS_SERVICE));
}
return val;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 3fb271d..c386aee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -27,7 +27,9 @@ import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -43,6 +45,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.stream.Collectors;
public class CopyUtils {
@@ -297,39 +300,27 @@ public class CopyUtils {
return false;
}
- private UserGroupInformation getProxyUser() throws LoginException, IOException {
+ private UserGroupInformation getProxyUser() throws IOException {
if (copyAsUser == null) {
return null;
}
- UserGroupInformation proxyUser = null;
- int currentRetry = 0;
- while (currentRetry <= MAX_IO_RETRY) {
- try {
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(hiveConf)
+ .withRetryOnException(IOException.class).build();
+ try {
+ return retryable.executeCallable(() -> {
+ UserGroupInformation proxyUser = null;
UserGroupInformation ugi = Utils.getUGI();
String currentUser = ugi.getShortUserName();
if (!currentUser.equals(copyAsUser)) {
proxyUser = UserGroupInformation.createProxyUser(
- copyAsUser, UserGroupInformation.getLoginUser());
+ copyAsUser, UserGroupInformation.getLoginUser());
}
return proxyUser;
- } catch (IOException e) {
- currentRetry++;
- if (currentRetry <= MAX_IO_RETRY) {
- LOG.warn("Unable to get UGI info", e);
- } else {
- LOG.error("Unable to get UGI info", e);
- throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
- }
- int sleepTime = FileUtils.getSleepTime(currentRetry);
- LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry));
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException timerEx) {
- LOG.info("Sleep interrupted", timerEx.getMessage());
- }
- }
+ });
+ } catch (Exception e) {
+ throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
- return null;
}
// Copy without retry
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
index dbbbf2c..c51618b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
import com.google.gson.Gson;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
@@ -79,7 +80,7 @@ public class TestRangerLoadTask {
@Test
public void testFailureInvalidAuthProviderEndpoint() {
int status = task.execute();
- Assert.assertEquals(40000, status);
+ Assert.assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), status);
}
@Test
diff --git a/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out b/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out
index bd8892a..54b8a3f 100644
--- a/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out
+++ b/ql/src/test/results/clientnegative/alter_table_wrong_location2.q.out
@@ -11,4 +11,4 @@ PREHOOK: type: ALTERTABLE_LOCATION
PREHOOK: Input: default@testwrongloc
PREHOOK: Output: default@testwrongloc
#### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.ddl.DDLTask. {0} is not absolute. Please specify a complete absolute uri. relative/testwrongloc
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.ddl.DDLTask. relative/testwrongloc is not absolute. Please specify a complete absolute uri.