You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/12/01 10:35:47 UTC
[hive] branch master updated: HIVE-24417:Add config options for
Atlas and Ranger client timeouts (Pravin Kumar Sinha,
reviewed by Aasha Medhi)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new cb2ac3d HIVE-24417:Add config options for Atlas and Ranger client timeouts (Pravin Kumar Sinha, reviewed by Aasha Medhi)
cb2ac3d is described below
commit cb2ac3dcc6af276c6f64ee00f034f082fe75222b
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Tue Dec 1 16:05:29 2020 +0530
HIVE-24417:Add config options for Atlas and Ranger client timeouts (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 6 +++++
.../ql/exec/repl/atlas/AtlasRestClientBuilder.java | 11 ++++++--
.../ql/exec/repl/ranger/RangerRestClientImpl.java | 31 +++++++++++++---------
.../hive/ql/exec/repl/TestAtlasDumpTask.java | 26 +++++++++++++++++-
.../ql/exec/repl/ranger/TestRangerRestClient.java | 22 ++++++++++++---
5 files changed, 77 insertions(+), 19 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 40312b7..dedb37f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -613,12 +613,18 @@ public class HiveConf extends Configuration {
true,
"This configuration will add a deny policy on the target database for all users except hive"
+ " to avoid any update to the target database"),
+ REPL_RANGER_CLIENT_READ_TIMEOUT("hive.repl.ranger.client.read.timeout", "300s",
+ new TimeValidator(TimeUnit.SECONDS), "Ranger client read timeout for Ranger REST API calls."),
REPL_INCLUDE_ATLAS_METADATA("hive.repl.include.atlas.metadata", false,
"Indicates if Atlas metadata should be replicated along with Hive data and metadata or not."),
REPL_ATLAS_ENDPOINT("hive.repl.atlas.endpoint", null,
"Atlas endpoint of the current cluster hive database is getting replicated from/to."),
REPL_ATLAS_REPLICATED_TO_DB("hive.repl.atlas.replicatedto", null,
"Target hive database name Atlas metadata of source hive database is being replicated to."),
+ REPL_ATLAS_CLIENT_READ_TIMEOUT("hive.repl.atlas.client.read.timeout", "7200s",
+ new TimeValidator(TimeUnit.SECONDS), "Atlas client read timeout for Atlas REST API calls."),
+ REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT("hive.repl.external.client.connect.timeout", "10s",
+ new TimeValidator(TimeUnit.SECONDS), "Client connect timeout for REST API calls to external service."),
REPL_SOURCE_CLUSTER_NAME("hive.repl.source.cluster.name", null,
"Name of the source cluster for the replication."),
REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
index fa0ca5f..75cb45b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
/**
* Builder for AtlasRestClient.
@@ -42,6 +43,8 @@ public class AtlasRestClientBuilder {
private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
private static final String URL_SEPERATOR = ",";
+ public static final String ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS = "atlas.client.connectTimeoutMSecs";
+ public static final String ATLAS_PROPERTY_READ_TIMEOUT_IN_MS = "atlas.client.readTimeoutMSecs";
private UserGroupInformation userGroupInformation;
protected String incomingUrl;
@@ -69,7 +72,7 @@ public class AtlasRestClientBuilder {
ReplUtils.REPL_ATLAS_SERVICE));
}
setUGInfo();
- initializeAtlasApplicationProperties();
+ initializeAtlasApplicationProperties(conf);
AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation,
this.userGroupInformation.getShortUserName(), baseUrls);
return new AtlasRestClientImpl(clientV2, conf);
@@ -85,9 +88,13 @@ public class AtlasRestClientBuilder {
return this;
}
- private void initializeAtlasApplicationProperties() throws SemanticException {
+ private void initializeAtlasApplicationProperties(HiveConf conf) throws SemanticException {
try {
Properties props = new Properties();
+ props.setProperty(ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS, String.valueOf(
+ conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS)));
+ props.setProperty(ATLAS_PROPERTY_READ_TIMEOUT_IN_MS, String.valueOf(
+ conf.getTimeVar(HiveConf.ConfVars.REPL_ATLAS_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS)));
props.setProperty(ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY, "1");
props.setProperty(ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, "0");
props.setProperty(ATLAS_PROPERTY_REST_ADDRESS, incomingUrl);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index 459c95f..b14a44f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -69,6 +69,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Arrays;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
/**
* RangerRestClientImpl to connect to Ranger and export policies.
@@ -93,7 +94,7 @@ public class RangerRestClientImpl implements RangerRestClient {
.withRetryOnException(Exception.class).build();
try {
return retryable.executeCallable(() -> exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName,
- dbName));
+ dbName, hiveConf));
} catch (Exception e) {
throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
@@ -102,10 +103,11 @@ public class RangerRestClientImpl implements RangerRestClient {
@VisibleForTesting
RangerExportPolicyList exportRangerPoliciesPlain(String sourceRangerEndpoint,
String rangerHiveServiceName,
- String dbName) throws SemanticException, URISyntaxException {
+ String dbName, HiveConf hiveConf)
+ throws SemanticException, URISyntaxException {
String finalUrl = getRangerExportUrl(sourceRangerEndpoint, rangerHiveServiceName, dbName);
LOG.debug("Url to export policies from source Ranger: {}", finalUrl);
- WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
+ WebResource.Builder builder = getRangerResourceBuilder(finalUrl, hiveConf);
RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
ClientResponse clientResp = builder.get(ClientResponse.class);
String response = null;
@@ -205,7 +207,7 @@ public class RangerRestClientImpl implements RangerRestClient {
try {
return retryable.executeCallable(() -> importRangerPoliciesPlain(jsonRangerExportPolicyList,
rangerPoliciesJsonFileName,
- serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList));
+ serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList, hiveConf));
} catch (Exception e) {
throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
@@ -215,7 +217,7 @@ public class RangerRestClientImpl implements RangerRestClient {
String rangerPoliciesJsonFileName,
String serviceMapJsonFileName, String jsonServiceMap,
String finalUrl, RangerExportPolicyList
- rangerExportPolicyList) throws Exception {
+ rangerExportPolicyList, HiveConf hiveConf) throws Exception {
ClientResponse clientResp = null;
StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
@@ -227,7 +229,7 @@ public class RangerRestClientImpl implements RangerRestClient {
MultiPart multipartEntity = null;
try {
multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
- WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
+ WebResource.Builder builder = getRangerResourceBuilder(finalUrl, hiveConf);
clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
.post(ClientResponse.class, multipartEntity);
if (clientResp != null) {
@@ -270,11 +272,16 @@ public class RangerRestClientImpl implements RangerRestClient {
return uriBuilder.build().toString();
}
- private synchronized Client getRangerClient() {
+ @VisibleForTesting
+ synchronized Client getRangerClient(HiveConf hiveConf) {
Client ret = null;
ClientConfig config = new DefaultClientConfig();
config.getClasses().add(MultiPartWriter.class);
config.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, true);
+ config.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT,
+ (int) hiveConf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS));
+ config.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT,
+ (int) hiveConf.getTimeVar(HiveConf.ConfVars.REPL_RANGER_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS));
ret = Client.create(config);
return ret;
}
@@ -398,16 +405,16 @@ public class RangerRestClientImpl implements RangerRestClient {
.withHiveConf(hiveConf)
.withRetryOnException(Exception.class).build();
try {
- return retryable.executeCallable(() -> checkConnectionPlain(url));
+ return retryable.executeCallable(() -> checkConnectionPlain(url, hiveConf));
} catch (Exception e) {
throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
@VisibleForTesting
- boolean checkConnectionPlain(String url) {
+ boolean checkConnectionPlain(String url, HiveConf hiveConf) {
WebResource.Builder builder;
- builder = getRangerResourceBuilder(url);
+ builder = getRangerResourceBuilder(url, hiveConf);
ClientResponse clientResp = builder.get(ClientResponse.class);
return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
}
@@ -478,8 +485,8 @@ public class RangerRestClientImpl implements RangerRestClient {
}
- private WebResource.Builder getRangerResourceBuilder(String url) {
- Client client = getRangerClient();
+ private WebResource.Builder getRangerResourceBuilder(String url, HiveConf hiveConf) {
+ Client client = getRangerClient(hiveConf);
WebResource webResource = client.resource(url);
WebResource.Builder builder = webResource.getRequestBuilder();
return builder;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
index 5f79db0..99a51ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -24,6 +24,7 @@ import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
@@ -42,6 +43,7 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
@@ -55,6 +57,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
@@ -65,7 +68,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
* Unit test class for testing Atlas metadata Dump.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({LoggerFactory.class, UserGroupInformation.class})
+@PrepareForTest({LoggerFactory.class, UserGroupInformation.class, ConfigurationConverter.class})
public class TestAtlasDumpTask {
@Mock
@@ -218,6 +221,27 @@ public class TestAtlasDumpTask {
Mockito.verify(atlasClientV2, Mockito.times(4)).getServer(getServerReqCaptor.capture());
}
+ @Test
+ public void testAtlasClientTimeouts() throws Exception {
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT,
+ TimeUnit.MILLISECONDS)).thenReturn(20L);
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_ATLAS_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS)).thenReturn(500L);
+ mockStatic(UserGroupInformation.class);
+ when(UserGroupInformation.getLoginUser()).thenReturn(mock(UserGroupInformation.class));
+ mockStatic(ConfigurationConverter.class);
+ when(ConfigurationConverter.getConfiguration(Mockito.any(Properties.class))).thenCallRealMethod();
+ AtlasRestClientBuilder atlasRestCleintBuilder = new AtlasRestClientBuilder("http://localhost:31000");
+ AtlasRestClient atlasClient = atlasRestCleintBuilder.getClient(conf);
+ Assert.assertTrue(atlasClient != null);
+ ArgumentCaptor<Properties> propsCaptor = ArgumentCaptor.forClass(Properties.class);
+ PowerMockito.verifyStatic(ConfigurationConverter.class, Mockito.times(1));
+ ConfigurationConverter.getConfiguration(propsCaptor.capture());
+ Assert.assertEquals("20", propsCaptor.getValue().getProperty(
+ AtlasRestClientBuilder.ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS));
+ Assert.assertEquals("500", propsCaptor.getValue().getProperty(
+ AtlasRestClientBuilder.ATLAS_PROPERTY_READ_TIMEOUT_IN_MS));
+ }
+
private void setupConfForRetry() {
when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L);
when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
index 5f41488..10825f3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.exec.repl.ranger;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.config.ClientConfig;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
@@ -74,12 +76,12 @@ public class TestRangerRestClient {
@Test
public void testSuccessSimpleAuthCheckConnection() throws Exception {
Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false);
- Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true);
+ Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString(), Mockito.any(HiveConf.class))).thenReturn(true);
Mockito.when(mockClient.checkConnection(Mockito.anyString(), Mockito.any())).thenCallRealMethod();
mockClient.checkConnection("http://localhost:6080/ranger", conf);
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockClient,
- Mockito.times(1)).checkConnectionPlain(urlCaptor.capture());
+ Mockito.times(1)).checkConnectionPlain(urlCaptor.capture(), Mockito.any(HiveConf.class));
Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue());
ArgumentCaptor<PrivilegedAction> privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class);
Mockito.verify(userGroupInformation,
@@ -89,8 +91,8 @@ public class TestRangerRestClient {
@Test
public void testSuccessSimpleAuthRangerExport() throws Exception {
Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false);
- Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
- .thenReturn(new RangerExportPolicyList());
+ Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
+ Mockito.any(HiveConf.class))).thenReturn(new RangerExportPolicyList());
Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.any()))
.thenCallRealMethod();
@@ -110,4 +112,16 @@ public class TestRangerRestClient {
Mockito.verify(userGroupInformation,
Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture());
}
+
+ @Test
+ public void testRangerClientTimeouts() {
+ Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT,
+ TimeUnit.MILLISECONDS)).thenReturn(20L);
+ Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_RANGER_CLIENT_READ_TIMEOUT,
+ TimeUnit.MILLISECONDS)).thenReturn(500L);
+ Mockito.when(mockClient.getRangerClient(Mockito.any(HiveConf.class))).thenCallRealMethod();
+ Client client =mockClient.getRangerClient(conf);
+ Assert.assertEquals(20, client.getProperties().get(ClientConfig.PROPERTY_CONNECT_TIMEOUT));
+ Assert.assertEquals(500, client.getProperties().get(ClientConfig.PROPERTY_READ_TIMEOUT));
+ }
}