You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/12/01 10:35:47 UTC

[hive] branch master updated: HIVE-24417:Add config options for Atlas and Ranger client timeouts (Pravin Kumar Sinha, reviewed by Aasha Medhi)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new cb2ac3d  HIVE-24417:Add config options for Atlas and Ranger client timeouts (Pravin Kumar Sinha, reviewed by Aasha Medhi)
cb2ac3d is described below

commit cb2ac3dcc6af276c6f64ee00f034f082fe75222b
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Tue Dec 1 16:05:29 2020 +0530

    HIVE-24417:Add config options for Atlas and Ranger client timeouts (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  6 +++++
 .../ql/exec/repl/atlas/AtlasRestClientBuilder.java | 11 ++++++--
 .../ql/exec/repl/ranger/RangerRestClientImpl.java  | 31 +++++++++++++---------
 .../hive/ql/exec/repl/TestAtlasDumpTask.java       | 26 +++++++++++++++++-
 .../ql/exec/repl/ranger/TestRangerRestClient.java  | 22 ++++++++++++---
 5 files changed, 77 insertions(+), 19 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 40312b7..dedb37f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -613,12 +613,18 @@ public class HiveConf extends Configuration {
       true,
       "This configuration will add a deny policy on the target database for all users except hive"
         + " to avoid any update to the target database"),
+    REPL_RANGER_CLIENT_READ_TIMEOUT("hive.repl.ranger.client.read.timeout", "300s",
+            new TimeValidator(TimeUnit.SECONDS), "Ranger client read timeout for Ranger REST API calls."),
     REPL_INCLUDE_ATLAS_METADATA("hive.repl.include.atlas.metadata", false,
             "Indicates if Atlas metadata should be replicated along with Hive data and metadata or not."),
     REPL_ATLAS_ENDPOINT("hive.repl.atlas.endpoint", null,
             "Atlas endpoint of the current cluster hive database is getting replicated from/to."),
     REPL_ATLAS_REPLICATED_TO_DB("hive.repl.atlas.replicatedto", null,
             "Target hive database name Atlas metadata of source hive database is being replicated to."),
+    REPL_ATLAS_CLIENT_READ_TIMEOUT("hive.repl.atlas.client.read.timeout", "7200s",
+            new TimeValidator(TimeUnit.SECONDS), "Atlas client read timeout for Atlas REST API calls."),
+    REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT("hive.repl.external.client.connect.timeout", "10s",
+            new TimeValidator(TimeUnit.SECONDS), "Client connect timeout for REST API calls to external service."),
     REPL_SOURCE_CLUSTER_NAME("hive.repl.source.cluster.name", null,
             "Name of the source cluster for the replication."),
     REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
index fa0ca5f..75cb45b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Builder for AtlasRestClient.
@@ -42,6 +43,8 @@ public class AtlasRestClientBuilder {
   private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
   private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
   private static final String URL_SEPERATOR = ",";
+  public static final String ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS = "atlas.client.connectTimeoutMSecs";
+  public static final String ATLAS_PROPERTY_READ_TIMEOUT_IN_MS = "atlas.client.readTimeoutMSecs";
 
   private UserGroupInformation userGroupInformation;
   protected String incomingUrl;
@@ -69,7 +72,7 @@ public class AtlasRestClientBuilder {
         ReplUtils.REPL_ATLAS_SERVICE));
     }
     setUGInfo();
-    initializeAtlasApplicationProperties();
+    initializeAtlasApplicationProperties(conf);
     AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation,
             this.userGroupInformation.getShortUserName(), baseUrls);
     return new AtlasRestClientImpl(clientV2, conf);
@@ -85,9 +88,13 @@ public class AtlasRestClientBuilder {
     return this;
   }
 
-  private void initializeAtlasApplicationProperties() throws SemanticException {
+  private void initializeAtlasApplicationProperties(HiveConf conf) throws SemanticException {
     try {
       Properties props = new Properties();
+      props.setProperty(ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS, String.valueOf(
+              conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS)));
+      props.setProperty(ATLAS_PROPERTY_READ_TIMEOUT_IN_MS, String.valueOf(
+              conf.getTimeVar(HiveConf.ConfVars.REPL_ATLAS_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS)));
       props.setProperty(ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY, "1");
       props.setProperty(ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, "0");
       props.setProperty(ATLAS_PROPERTY_REST_ADDRESS, incomingUrl);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index 459c95f..b14a44f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -69,6 +69,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Arrays;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 /**
  * RangerRestClientImpl to connect to Ranger and export policies.
@@ -93,7 +94,7 @@ public class RangerRestClientImpl implements RangerRestClient {
       .withRetryOnException(Exception.class).build();
     try {
       return retryable.executeCallable(() -> exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName,
-        dbName));
+        dbName, hiveConf));
     } catch (Exception e) {
       throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
@@ -102,10 +103,11 @@ public class RangerRestClientImpl implements RangerRestClient {
   @VisibleForTesting
   RangerExportPolicyList exportRangerPoliciesPlain(String sourceRangerEndpoint,
                                                            String rangerHiveServiceName,
-                                                           String dbName) throws SemanticException, URISyntaxException {
+                                                           String dbName, HiveConf hiveConf)
+          throws SemanticException, URISyntaxException {
     String finalUrl = getRangerExportUrl(sourceRangerEndpoint, rangerHiveServiceName, dbName);
     LOG.debug("Url to export policies from source Ranger: {}", finalUrl);
-    WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
+    WebResource.Builder builder = getRangerResourceBuilder(finalUrl, hiveConf);
     RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
     ClientResponse clientResp = builder.get(ClientResponse.class);
     String response = null;
@@ -205,7 +207,7 @@ public class RangerRestClientImpl implements RangerRestClient {
     try {
       return retryable.executeCallable(() -> importRangerPoliciesPlain(jsonRangerExportPolicyList,
               rangerPoliciesJsonFileName,
-              serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList));
+              serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList, hiveConf));
     } catch (Exception e) {
       throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
@@ -215,7 +217,7 @@ public class RangerRestClientImpl implements RangerRestClient {
                                                            String rangerPoliciesJsonFileName,
                                                            String serviceMapJsonFileName, String jsonServiceMap,
                                                            String finalUrl, RangerExportPolicyList
-                                                             rangerExportPolicyList) throws Exception {
+                                                           rangerExportPolicyList, HiveConf hiveConf) throws Exception {
     ClientResponse clientResp = null;
     StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
       new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
@@ -227,7 +229,7 @@ public class RangerRestClientImpl implements RangerRestClient {
     MultiPart multipartEntity = null;
     try {
       multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
-      WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
+      WebResource.Builder builder = getRangerResourceBuilder(finalUrl, hiveConf);
       clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
         .post(ClientResponse.class, multipartEntity);
       if (clientResp != null) {
@@ -270,11 +272,16 @@ public class RangerRestClientImpl implements RangerRestClient {
     return uriBuilder.build().toString();
   }
 
-  private synchronized Client getRangerClient() {
+  @VisibleForTesting
+  synchronized Client getRangerClient(HiveConf hiveConf) {
     Client ret = null;
     ClientConfig config = new DefaultClientConfig();
     config.getClasses().add(MultiPartWriter.class);
     config.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, true);
+    config.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT,
+            (int) hiveConf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS));
+    config.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT,
+            (int) hiveConf.getTimeVar(HiveConf.ConfVars.REPL_RANGER_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS));
     ret = Client.create(config);
     return ret;
   }
@@ -398,16 +405,16 @@ public class RangerRestClientImpl implements RangerRestClient {
       .withHiveConf(hiveConf)
       .withRetryOnException(Exception.class).build();
     try {
-      return retryable.executeCallable(() -> checkConnectionPlain(url));
+      return retryable.executeCallable(() -> checkConnectionPlain(url, hiveConf));
     } catch (Exception e) {
       throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
   }
 
   @VisibleForTesting
-  boolean checkConnectionPlain(String url) {
+  boolean checkConnectionPlain(String url, HiveConf hiveConf) {
     WebResource.Builder builder;
-    builder = getRangerResourceBuilder(url);
+    builder = getRangerResourceBuilder(url, hiveConf);
     ClientResponse clientResp = builder.get(ClientResponse.class);
     return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
   }
@@ -478,8 +485,8 @@ public class RangerRestClientImpl implements RangerRestClient {
   }
 
 
-  private WebResource.Builder getRangerResourceBuilder(String url) {
-    Client client = getRangerClient();
+  private WebResource.Builder getRangerResourceBuilder(String url, HiveConf hiveConf) {
+    Client client = getRangerClient(hiveConf);
     WebResource webResource = client.resource(url);
     WebResource.Builder builder = webResource.getRequestBuilder();
     return builder;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
index 5f79db0..99a51ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -24,6 +24,7 @@ import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
@@ -42,6 +43,7 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
@@ -55,6 +57,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.mock;
@@ -65,7 +68,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
  * Unit test class for testing Atlas metadata Dump.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({LoggerFactory.class, UserGroupInformation.class})
+@PrepareForTest({LoggerFactory.class, UserGroupInformation.class, ConfigurationConverter.class})
 public class TestAtlasDumpTask {
 
   @Mock
@@ -218,6 +221,27 @@ public class TestAtlasDumpTask {
     Mockito.verify(atlasClientV2, Mockito.times(4)).getServer(getServerReqCaptor.capture());
   }
 
+  @Test
+  public void testAtlasClientTimeouts() throws Exception {
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT,
+            TimeUnit.MILLISECONDS)).thenReturn(20L);
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_ATLAS_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS)).thenReturn(500L);
+    mockStatic(UserGroupInformation.class);
+    when(UserGroupInformation.getLoginUser()).thenReturn(mock(UserGroupInformation.class));
+    mockStatic(ConfigurationConverter.class);
+    when(ConfigurationConverter.getConfiguration(Mockito.any(Properties.class))).thenCallRealMethod();
+    AtlasRestClientBuilder atlasRestCleintBuilder = new AtlasRestClientBuilder("http://localhost:31000");
+    AtlasRestClient atlasClient = atlasRestCleintBuilder.getClient(conf);
+    Assert.assertTrue(atlasClient != null);
+    ArgumentCaptor<Properties> propsCaptor = ArgumentCaptor.forClass(Properties.class);
+    PowerMockito.verifyStatic(ConfigurationConverter.class, Mockito.times(1));
+    ConfigurationConverter.getConfiguration(propsCaptor.capture());
+    Assert.assertEquals("20", propsCaptor.getValue().getProperty(
+            AtlasRestClientBuilder.ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS));
+    Assert.assertEquals("500", propsCaptor.getValue().getProperty(
+            AtlasRestClientBuilder.ATLAS_PROPERTY_READ_TIMEOUT_IN_MS));
+  }
+
   private void setupConfForRetry() {
     when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L);
     when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
index 5f41488..10825f3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.ranger;
 
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.config.ClientConfig;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
@@ -74,12 +76,12 @@ public class TestRangerRestClient {
   @Test
   public void testSuccessSimpleAuthCheckConnection() throws Exception {
     Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false);
-    Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true);
+    Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString(), Mockito.any(HiveConf.class))).thenReturn(true);
     Mockito.when(mockClient.checkConnection(Mockito.anyString(), Mockito.any())).thenCallRealMethod();
     mockClient.checkConnection("http://localhost:6080/ranger", conf);
     ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
     Mockito.verify(mockClient,
-      Mockito.times(1)).checkConnectionPlain(urlCaptor.capture());
+      Mockito.times(1)).checkConnectionPlain(urlCaptor.capture(), Mockito.any(HiveConf.class));
     Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue());
     ArgumentCaptor<PrivilegedAction> privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class);
     Mockito.verify(userGroupInformation,
@@ -89,8 +91,8 @@ public class TestRangerRestClient {
   @Test
   public void testSuccessSimpleAuthRangerExport() throws Exception {
     Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false);
-    Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
-      .thenReturn(new RangerExportPolicyList());
+    Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
+            Mockito.any(HiveConf.class))).thenReturn(new RangerExportPolicyList());
     Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
       Mockito.any()))
       .thenCallRealMethod();
@@ -110,4 +112,16 @@ public class TestRangerRestClient {
     Mockito.verify(userGroupInformation,
       Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture());
   }
+
+  @Test
+  public void testRangerClientTimeouts() {
+    Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT,
+            TimeUnit.MILLISECONDS)).thenReturn(20L);
+    Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_RANGER_CLIENT_READ_TIMEOUT,
+            TimeUnit.MILLISECONDS)).thenReturn(500L);
+    Mockito.when(mockClient.getRangerClient(Mockito.any(HiveConf.class))).thenCallRealMethod();
+    Client client =mockClient.getRangerClient(conf);
+    Assert.assertEquals(20, client.getProperties().get(ClientConfig.PROPERTY_CONNECT_TIMEOUT));
+    Assert.assertEquals(500, client.getProperties().get(ClientConfig.PROPERTY_READ_TIMEOUT));
+  }
 }