You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/06/10 14:19:56 UTC
[hive] branch master updated: HIVE-23659: Add Retry for Ranger
Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 84ee70e HIVE-23659: Add Retry for Ranger Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
84ee70e is described below
commit 84ee70e8ae6e9cd41e36feab16cbcc5afad50a0b
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Wed Jun 10 19:49:44 2020 +0530
HIVE-23659: Add Retry for Ranger Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
.../ql/exec/repl/ranger/RangerRestClientImpl.java | 171 ++++++++++++---------
.../apache/hadoop/hive/metastore/utils/Retry.java | 21 ++-
.../hadoop/hive/metastore/utils/RetryTest.java | 69 ++++++++-
3 files changed, 187 insertions(+), 74 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index 1b17632..13d3836 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -72,7 +72,6 @@ public class RangerRestClientImpl implements RangerRestClient {
String dbName,
String rangerHiveServiceName)throws SemanticException {
LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint);
- ClientResponse clientResp;
String uri;
if (StringUtils.isEmpty(rangerHiveServiceName)) {
throw new SemanticException("Ranger Service Name cannot be empty");
@@ -85,33 +84,43 @@ public class RangerRestClientImpl implements RangerRestClient {
}
String url = sourceRangerEndpoint + (uri.startsWith("/") ? uri : ("/" + uri));
LOG.debug("Url to export policies from source Ranger: {}", url);
- RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
- WebResource.Builder builder = getRangerResourceBuilder(url);
- clientResp = builder.get(ClientResponse.class);
-
- String response = null;
- if (clientResp != null) {
- if (clientResp.getStatus() == HttpServletResponse.SC_OK) {
- Gson gson = new GsonBuilder().create();
- response = clientResp.getEntity(String.class);
- LOG.debug("Response received for ranger export {} ", response);
- if (StringUtils.isNotEmpty(response)) {
- rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class);
- return rangerExportPolicyList;
+
+ Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) {
+ @Override
+ public RangerExportPolicyList execute() throws Exception {
+ WebResource.Builder builder = getRangerResourceBuilder(url);
+ RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
+ ClientResponse clientResp = builder.get(ClientResponse.class);
+ String response = null;
+ if (clientResp != null) {
+ if (clientResp.getStatus() == HttpServletResponse.SC_OK) {
+ Gson gson = new GsonBuilder().create();
+ response = clientResp.getEntity(String.class);
+ LOG.debug("Response received for ranger export {} ", response);
+ if (StringUtils.isNotEmpty(response)) {
+ rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class);
+ return rangerExportPolicyList;
+ }
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+ LOG.debug("Ranger policy export request returned empty list");
+ return rangerExportPolicyList;
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+ throw new SemanticException("Authentication Failure while communicating to Ranger admin");
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) {
+ throw new SemanticException("Authorization Failure while communicating to Ranger admin");
+ }
}
- } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
- LOG.debug("Ranger policy export request returned empty list");
- return rangerExportPolicyList;
- } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
- throw new SemanticException("Authentication Failure while communicating to Ranger admin");
- } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) {
- throw new SemanticException("Authorization Failure while communicating to Ranger admin");
+ if (StringUtils.isEmpty(response)) {
+ LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
+ }
+ return null;
}
+ };
+ try {
+ return retriable.runWithDelay();
+ } catch (Exception e) {
+ throw new SemanticException(e);
}
- if (StringUtils.isEmpty(response)) {
- LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
- }
- return rangerExportPolicyList;
}
public List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies) {
@@ -170,50 +179,60 @@ public class RangerRestClientImpl implements RangerRestClient {
+ (uri.startsWith("/") ? uri : ("/" + uri));
LOG.debug("URL to import policies on target Ranger: {}", url);
- ClientResponse clientResp = null;
-
- StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
- new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
- rangerPoliciesJsonFileName);
- StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson",
- new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName);
-
- FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
- MultiPart multipartEntity = null;
- try {
- multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
- WebResource.Builder builder = getRangerResourceBuilder(url);
- clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
- .post(ClientResponse.class, multipartEntity);
- if (clientResp != null) {
- if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
- LOG.debug("Ranger policy import finished successfully");
-
- } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
- throw new Exception("Authentication Failure while communicating to Ranger admin");
- } else {
- throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs.");
- }
- }
- } finally {
- try {
- if (filePartPolicies != null) {
- filePartPolicies.cleanup();
- }
- if (filePartServiceMap != null) {
- filePartServiceMap.cleanup();
- }
- if (formDataMultiPart != null) {
- formDataMultiPart.close();
- }
- if (multipartEntity != null) {
- multipartEntity.close();
+ Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) {
+ @Override
+ public RangerExportPolicyList execute() throws Exception {
+ ClientResponse clientResp = null;
+
+ StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
+ new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
+ rangerPoliciesJsonFileName);
+ StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson",
+ new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName);
+
+ FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
+ MultiPart multipartEntity = null;
+ try {
+ multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
+ WebResource.Builder builder = getRangerResourceBuilder(url);
+ clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
+ .post(ClientResponse.class, multipartEntity);
+ if (clientResp != null) {
+ if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+ LOG.debug("Ranger policy import finished successfully");
+
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+ throw new Exception("Authentication Failure while communicating to Ranger admin");
+ } else {
+ throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs.");
+ }
+ }
+ } finally {
+ try {
+ if (filePartPolicies != null) {
+ filePartPolicies.cleanup();
+ }
+ if (filePartServiceMap != null) {
+ filePartServiceMap.cleanup();
+ }
+ if (formDataMultiPart != null) {
+ formDataMultiPart.close();
+ }
+ if (multipartEntity != null) {
+ multipartEntity.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Exception occurred while closing resources: {}", e);
+ }
}
- } catch (IOException e) {
- LOG.error("Exception occurred while closing resources: {}", e);
+ return rangerExportPolicyList;
}
+ };
+ try {
+ return retriable.runWithDelay();
+ } catch (Exception e) {
+ throw new SemanticException(e);
}
- return rangerExportPolicyList;
}
private synchronized Client getRangerClient() {
@@ -342,11 +361,21 @@ public class RangerRestClientImpl implements RangerRestClient {
}
@Override
- public boolean checkConnection(String url) {
- WebResource.Builder builder;
- builder = getRangerResourceBuilder(url);
- ClientResponse clientResp = builder.get(ClientResponse.class);
- return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
+ public boolean checkConnection(String url) throws SemanticException {
+ Retry<Boolean> retriable = new Retry<Boolean>(Exception.class) {
+ @Override
+ public Boolean execute() throws Exception {
+ WebResource.Builder builder;
+ builder = getRangerResourceBuilder(url);
+ ClientResponse clientResp = builder.get(ClientResponse.class);
+ return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
+ }
+ };
+ try {
+ return retriable.runWithDelay();
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
}
@Override
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
index bdb269a..032eaf4 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
@@ -23,7 +23,8 @@ package org.apache.hadoop.hive.metastore.utils;
*/
public abstract class Retry<T> {
- public static final int MAX_RETRIES = 3;
+ public static final int MAX_RETRIES = 4;
+ public static final int DELAY = 30 * 1000;
private int tries = 0;
private Class retryExceptionType;
@@ -49,4 +50,22 @@ public abstract class Retry<T> {
}
}
}
+
+ public T runWithDelay() throws Exception {
+ try {
+ return execute();
+ } catch(Exception e) {
+ if (e.getClass().equals(retryExceptionType)){
+ tries++;
+ if (MAX_RETRIES == tries) {
+ throw e;
+ } else {
+ Thread.sleep(DELAY * tries);
+ return runWithDelay();
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
}
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
index 67bd658..8cff68d 100644
--- a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
+++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
@@ -28,15 +28,21 @@ public class RetryTest {
@Test
public void testRetrySuccess() {
Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+ private int count = 0;
@Override
public Void execute() {
- throw new NullPointerException();
+ if (count < 1) {
+ count++;
+ throw new NullPointerException();
+ } else {
+ return null;
+ }
}
};
try {
retriable.run();
} catch (Exception e) {
- Assert.assertEquals(NullPointerException.class, e.getClass());
+ Assert.fail();
}
}
@@ -50,8 +56,67 @@ public class RetryTest {
};
try {
retriable.run();
+ Assert.fail();
} catch (Exception e) {
Assert.assertEquals(RuntimeException.class, e.getClass());
}
}
+
+ @Test
+ public void testRetryFailureWithDelay() {
+ Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+ @Override
+ public Void execute() {
+ throw new RuntimeException();
+ }
+ };
+ try {
+ retriable.runWithDelay();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(RuntimeException.class, e.getClass());
+ }
+ }
+
+ @Test
+ public void testRetrySuccessWithDelay() {
+ Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+ private long startTime = System.currentTimeMillis();
+ @Override
+ public Void execute() {
+ executeWithDelay(startTime);
+ return null;
+ }
+ };
+ try {
+ retriable.runWithDelay();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ private void executeWithDelay(long startTime) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - startTime < 40 * 1000) {
+ throw new NullPointerException();
+ }
+ }
+
+ @Test
+ public void testRetryFailureWithDelayMoreThanTimeout() {
+ Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+ @Override
+ public Void execute() {
+ throw new NullPointerException();
+ }
+ };
+ long startTime = System.currentTimeMillis();
+ try {
+ retriable.runWithDelay();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class, e.getClass());
+ Assert.assertTrue(System.currentTimeMillis() - startTime > 180 * 1000);
+ }
+ }
}