You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/07/27 03:34:32 UTC
[hive] branch master updated: HIVE-23863: UGI doAs privilege action
to make calls to Ranger Service (Aasha Medhi,
reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7298dc1 HIVE-23863: UGI doAs privilege action to make calls to Ranger Service (Aasha Medhi, reviewed by Pravin Kumar Sinha)
7298dc1 is described below
commit 7298dc162f3c04efc880659ffdf5b075606c2557
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Jul 27 09:04:20 2020 +0530
HIVE-23863: UGI doAs privilege action to make calls to Ranger Service (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
.../hadoop/hive/ql/exec/repl/AtlasDumpTask.java | 2 +
.../hadoop/hive/ql/exec/repl/AtlasLoadTask.java | 2 +
.../hadoop/hive/ql/exec/repl/RangerDumpTask.java | 2 +
.../hadoop/hive/ql/exec/repl/RangerLoadTask.java | 2 +
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 2 +
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 2 +
.../ql/exec/repl/atlas/AtlasRestClientImpl.java | 3 +
.../ql/exec/repl/ranger/RangerRestClientImpl.java | 191 +++++++++++++--------
.../repl/metric/ReplicationMetricCollector.java | 6 +
.../ql/exec/repl/ranger/TestRangerRestClient.java | 136 +++++++++++++++
.../metric/TestReplicationMetricCollector.java | 14 ++
.../hive/metastore/RetryingMetaStoreClient.java | 29 +---
.../hadoop/hive/metastore/utils/SecurityUtils.java | 25 +++
13 files changed, 316 insertions(+), 100 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index 502dabb..94ea3c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -25,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
@@ -79,6 +80,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
@Override
public int execute() {
try {
+ SecurityUtils.reloginExpiringKeytabUser();
AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index 534c85d..b24b3d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
@@ -68,6 +69,7 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
@Override
public int execute() {
try {
+ SecurityUtils.reloginExpiringKeytabUser();
AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.ENTITIES.name(), 0L);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
index 92ca6ea..7a0259f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
@@ -22,6 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient;
@@ -80,6 +81,7 @@ public class RangerDumpTask extends Task<RangerDumpWork> implements Serializable
long exportCount = 0;
Path filePath = null;
LOG.info("Exporting Ranger Metadata");
+ SecurityUtils.reloginExpiringKeytabUser();
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.POLICIES.name(), 0L);
work.getMetricCollector().reportStageStart(getName(), metricMap);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
index fa57efd..20f8401 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
@@ -22,6 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient;
@@ -82,6 +83,7 @@ public class RangerLoadTask extends Task<RangerLoadWork> implements Serializable
LOG.info("Importing Ranger Metadata");
RangerExportPolicyList rangerExportPolicyList = null;
List<RangerPolicy> rangerPolicies = null;
+ SecurityUtils.reloginExpiringKeytabUser();
if (rangerRestClient == null) {
rangerRestClient = getRangerRestClient();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 8d76557..bb0ae1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
import org.apache.hadoop.hive.metastore.utils.Retry;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -152,6 +153,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
@Override
public int execute() {
try {
+ SecurityUtils.reloginExpiringKeytabUser();
if (work.tableDataCopyIteratorsInitialized()) {
initiateDataCopyTasks();
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 87543f2..5ac9a05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc;
@@ -105,6 +106,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
@Override
public int execute() {
try {
+ SecurityUtils.reloginExpiringKeytabUser();
Task<?> rootTask = work.getRootTask();
if (rootTask != null) {
rootTask.setChildTasks(null);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
index c8d738e..7a3bf61 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -27,6 +27,7 @@ import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.slf4j.Logger;
@@ -82,6 +83,7 @@ public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClie
return invokeWithRetry(new Callable<InputStream>() {
@Override
public InputStream call() throws Exception {
+ SecurityUtils.reloginExpiringKeytabUser();
return clientV2.exportData(request);
}
}, null);
@@ -100,6 +102,7 @@ public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClie
public AtlasImportResult call() throws Exception {
InputStream is = null;
try {
+ SecurityUtils.reloginExpiringKeytabUser();
is = fs.open(exportFilePath);
return clientV2.importData(request, is);
} finally {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index 91389ea..1b16723 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.repl.ranger;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.sun.jersey.api.client.Client;
@@ -35,7 +36,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.Retry;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.util.MultiPartWriter;
import org.slf4j.Logger;
@@ -54,6 +57,8 @@ import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
@@ -80,34 +85,13 @@ public class RangerRestClientImpl implements RangerRestClient {
Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) {
@Override
public RangerExportPolicyList execute() throws Exception {
- String finalUrl = getRangerExportUrl(sourceRangerEndpoint, rangerHiveServiceName, dbName);
- LOG.debug("Url to export policies from source Ranger: {}", finalUrl);
- WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
- RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
- ClientResponse clientResp = builder.get(ClientResponse.class);
- String response = null;
- if (clientResp != null) {
- if (clientResp.getStatus() == HttpServletResponse.SC_OK) {
- Gson gson = new GsonBuilder().create();
- response = clientResp.getEntity(String.class);
- LOG.debug("Response received for ranger export {} ", response);
- if (StringUtils.isNotEmpty(response)) {
- rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class);
- return rangerExportPolicyList;
- }
- } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
- LOG.debug("Ranger policy export request returned empty list");
- return rangerExportPolicyList;
- } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
- throw new SemanticException("Authentication Failure while communicating to Ranger admin");
- } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) {
- throw new SemanticException("Authorization Failure while communicating to Ranger admin");
- }
- }
- if (StringUtils.isEmpty(response)) {
- LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
+ if (UserGroupInformation.isSecurityEnabled()) {
+ SecurityUtils.reloginExpiringKeytabUser();
+ return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<RangerExportPolicyList>) () ->
+ exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, dbName));
+ } else {
+ return exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, dbName);
}
- return null;
}
};
try {
@@ -117,6 +101,40 @@ public class RangerRestClientImpl implements RangerRestClient {
}
}
+ @VisibleForTesting
+ RangerExportPolicyList exportRangerPoliciesPlain(String sourceRangerEndpoint,
+ String rangerHiveServiceName,
+ String dbName) throws SemanticException, URISyntaxException {
+ String finalUrl = getRangerExportUrl(sourceRangerEndpoint, rangerHiveServiceName, dbName);
+ LOG.debug("Url to export policies from source Ranger: {}", finalUrl);
+ WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
+ RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
+ ClientResponse clientResp = builder.get(ClientResponse.class);
+ String response = null;
+ if (clientResp != null) {
+ if (clientResp.getStatus() == HttpServletResponse.SC_OK) {
+ Gson gson = new GsonBuilder().create();
+ response = clientResp.getEntity(String.class);
+ LOG.debug("Response received for ranger export {} ", response);
+ if (StringUtils.isNotEmpty(response)) {
+ rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class);
+ return rangerExportPolicyList;
+ }
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+ LOG.debug("Ranger policy export request returned empty list");
+ return rangerExportPolicyList;
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+ throw new SemanticException("Authentication Failure while communicating to Ranger admin");
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) {
+ throw new SemanticException("Authorization Failure while communicating to Ranger admin");
+ }
+ }
+ if (StringUtils.isEmpty(response)) {
+ LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
+ }
+ return null;
+ }
+
public String getRangerExportUrl(String sourceRangerEndpoint, String rangerHiveServiceName,
String dbName) throws URISyntaxException {
URIBuilder uriBuilder = new URIBuilder(sourceRangerEndpoint);
@@ -185,50 +203,15 @@ public class RangerRestClientImpl implements RangerRestClient {
Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) {
@Override
public RangerExportPolicyList execute() throws Exception {
- ClientResponse clientResp = null;
-
- StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
- new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
- rangerPoliciesJsonFileName);
- StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson",
- new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName);
-
- FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
- MultiPart multipartEntity = null;
- try {
- multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
- WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
- clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
- .post(ClientResponse.class, multipartEntity);
- if (clientResp != null) {
- if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
- LOG.debug("Ranger policy import finished successfully");
-
- } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
- throw new Exception("Authentication Failure while communicating to Ranger admin");
- } else {
- throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs.");
- }
- }
- } finally {
- try {
- if (filePartPolicies != null) {
- filePartPolicies.cleanup();
- }
- if (filePartServiceMap != null) {
- filePartServiceMap.cleanup();
- }
- if (formDataMultiPart != null) {
- formDataMultiPart.close();
- }
- if (multipartEntity != null) {
- multipartEntity.close();
- }
- } catch (IOException e) {
- LOG.error("Exception occurred while closing resources: {}", e);
- }
+ if (UserGroupInformation.isSecurityEnabled()) {
+ SecurityUtils.reloginExpiringKeytabUser();
+ return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<RangerExportPolicyList>) () ->
+ importRangerPoliciesPlain(jsonRangerExportPolicyList, rangerPoliciesJsonFileName,
+ serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList));
+ } else {
+ return importRangerPoliciesPlain(jsonRangerExportPolicyList, rangerPoliciesJsonFileName,
+ serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList);
}
- return rangerExportPolicyList;
}
};
try {
@@ -238,6 +221,56 @@ public class RangerRestClientImpl implements RangerRestClient {
}
}
+ private RangerExportPolicyList importRangerPoliciesPlain(String jsonRangerExportPolicyList,
+ String rangerPoliciesJsonFileName,
+ String serviceMapJsonFileName, String jsonServiceMap,
+ String finalUrl, RangerExportPolicyList
+ rangerExportPolicyList) throws Exception {
+ ClientResponse clientResp = null;
+ StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
+ new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
+ rangerPoliciesJsonFileName);
+ StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson",
+ new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName);
+
+ FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
+ MultiPart multipartEntity = null;
+ try {
+ multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
+ WebResource.Builder builder = getRangerResourceBuilder(finalUrl);
+ clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
+ .post(ClientResponse.class, multipartEntity);
+ if (clientResp != null) {
+ if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+ LOG.debug("Ranger policy import finished successfully");
+
+ } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+ throw new Exception("Authentication Failure while communicating to Ranger admin");
+ } else {
+ throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs.");
+ }
+ }
+ } finally {
+ try {
+ if (filePartPolicies != null) {
+ filePartPolicies.cleanup();
+ }
+ if (filePartServiceMap != null) {
+ filePartServiceMap.cleanup();
+ }
+ if (formDataMultiPart != null) {
+ formDataMultiPart.close();
+ }
+ if (multipartEntity != null) {
+ multipartEntity.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Exception occurred while closing resources: {}", e);
+ }
+ }
+ return rangerExportPolicyList;
+ }
+
public String getRangerImportUrl(String rangerUrl, String dbName) throws URISyntaxException {
URIBuilder uriBuilder = new URIBuilder(rangerUrl);
uriBuilder.setPath(RANGER_REST_URL_IMPORTJSONFILE);
@@ -376,10 +409,12 @@ public class RangerRestClientImpl implements RangerRestClient {
Retry<Boolean> retriable = new Retry<Boolean>(Exception.class) {
@Override
public Boolean execute() throws Exception {
- WebResource.Builder builder;
- builder = getRangerResourceBuilder(url);
- ClientResponse clientResp = builder.get(ClientResponse.class);
- return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ SecurityUtils.reloginExpiringKeytabUser();
+ return UserGroupInformation.getLoginUser().doAs((PrivilegedAction<Boolean>) () -> checkConnectionPlain(url));
+ } else {
+ return checkConnectionPlain(url);
+ }
}
};
try {
@@ -389,6 +424,14 @@ public class RangerRestClientImpl implements RangerRestClient {
}
}
+ @VisibleForTesting
+ boolean checkConnectionPlain(String url) {
+ WebResource.Builder builder;
+ builder = getRangerResourceBuilder(url);
+ ClientResponse clientResp = builder.get(ClientResponse.class);
+ return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
+ }
+
@Override
public List<RangerPolicy> addDenyPolicies(List<RangerPolicy> rangerPolicies, String rangerServiceName,
String sourceDb, String targetDb) throws SemanticException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index 61cc348..ba19e28 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -77,6 +77,9 @@ public abstract class ReplicationMetricCollector {
Stage stage = progress.getStageByName(stageName);
stage.setStatus(status);
stage.setEndTime(System.currentTimeMillis());
+ if (Status.FAILED == status) {
+ progress.setStatus(Status.FAILED);
+ }
replicationMetric.setProgress(progress);
Metadata metadata = replicationMetric.getMetadata();
metadata.setLastReplId(lastReplId);
@@ -92,6 +95,9 @@ public abstract class ReplicationMetricCollector {
Stage stage = progress.getStageByName(stageName);
stage.setStatus(status);
stage.setEndTime(System.currentTimeMillis());
+ if (Status.FAILED == status) {
+ progress.setStatus(Status.FAILED);
+ }
replicationMetric.setProgress(progress);
metricCollector.addMetric(replicationMetric);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
new file mode 100644
index 0000000..12afd8e
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Unit test class for testing Ranger Dump.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({UserGroupInformation.class})
+public class TestRangerRestClient {
+
+ @Mock
+ private RangerRestClientImpl mockClient;
+
+ @Mock
+ private UserGroupInformation userGroupInformation;
+
+ @Before
+ public void setup() throws Exception {
+ PowerMockito.mockStatic(UserGroupInformation.class);
+ Mockito.when(UserGroupInformation.getLoginUser()).thenReturn(userGroupInformation);
+ Mockito.when(userGroupInformation.doAs((PrivilegedAction<Object>) Mockito.any())).thenCallRealMethod();
+ Mockito.when(mockClient.getRangerExportUrl(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .thenCallRealMethod();
+ Mockito.when(mockClient.getRangerImportUrl(Mockito.anyString(), Mockito.anyString()))
+ .thenCallRealMethod();
+ }
+
+ @Test
+ public void testSuccessSimpleAuthCheckConnection() throws Exception {
+ Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false);
+ Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true);
+ Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenCallRealMethod();
+ mockClient.checkConnection("http://localhost:6080/ranger");
+ ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
+ Mockito.verify(mockClient,
+ Mockito.times(1)).checkConnectionPlain(urlCaptor.capture());
+ Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue());
+ ArgumentCaptor<PrivilegedAction> privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class);
+ Mockito.verify(userGroupInformation,
+ Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture());
+ }
+
+ @Test
+ public void testSuccessKerberosAuthCheckConnection() throws Exception {
+ Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(true);
+ Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true);
+ Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenCallRealMethod();
+ mockClient.checkConnection("http://localhost:6080/ranger");
+ ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
+ Mockito.verify(mockClient,
+ Mockito.times(1)).checkConnectionPlain(urlCaptor.capture());
+ Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue());
+ ArgumentCaptor<PrivilegedAction> privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class);
+ Mockito.verify(userGroupInformation,
+ Mockito.times(3)).doAs(privilegedActionArgumentCaptor.capture());
+ }
+
+ @Test
+ public void testSuccessSimpleAuthRangerExport() throws Exception {
+ Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false);
+ Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(new RangerExportPolicyList());
+ Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .thenCallRealMethod();
+ mockClient.exportRangerPolicies("http://localhost:6080/ranger", "db",
+ "hive");
+ ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> dbCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> serviceCaptor = ArgumentCaptor.forClass(String.class);
+ Mockito.verify(mockClient,
+ Mockito.times(1)).exportRangerPolicies(urlCaptor.capture(), dbCaptor.capture(),
+ serviceCaptor.capture());
+ Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue());
+ Assert.assertEquals("db", dbCaptor.getValue());
+ Assert.assertEquals("hive", serviceCaptor.getValue());
+ ArgumentCaptor<PrivilegedAction> privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class);
+ Mockito.verify(userGroupInformation,
+ Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture());
+ }
+
+ @Test
+ public void testSuccessKerberosAuthRangerExport() throws Exception {
+ Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(true);
+ Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(new RangerExportPolicyList());
+ Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .thenCallRealMethod();
+ mockClient.exportRangerPolicies("http://localhost:6080/ranger", "db",
+ "hive");
+ ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> dbCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> serviceCaptor = ArgumentCaptor.forClass(String.class);
+ Mockito.verify(mockClient,
+ Mockito.times(1)).exportRangerPolicies(urlCaptor.capture(), dbCaptor.capture(),
+ serviceCaptor.capture());
+ Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue());
+ Assert.assertEquals("db", dbCaptor.getValue());
+ Assert.assertEquals("hive", serviceCaptor.getValue());
+ ArgumentCaptor<PrivilegedExceptionAction> privilegedActionArgumentCaptor = ArgumentCaptor
+ .forClass(PrivilegedExceptionAction.class);
+ Mockito.verify(userGroupInformation,
+ Mockito.times(1)).doAs(privilegedActionArgumentCaptor.capture());
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
index 95de5a8..625a6e1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java
@@ -303,4 +303,18 @@ public class TestReplicationMetricCollector {
}
}
+ @Test
+ public void testSuccessStageFailure() throws Exception {
+ ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db",
+ "staging", conf);
+ Map<String, Long> metricMap = new HashMap<>();
+ metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10);
+ metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1);
+ bootstrapDumpMetricCollector.reportStageStart("dump", metricMap);
+ bootstrapDumpMetricCollector.reportStageEnd("dump", Status.FAILED);
+ List<ReplicationMetric> metricList = MetricCollector.getInstance().getMetrics();
+ Assert.assertEquals(1, metricList.size());
+ ReplicationMetric actualMetric = metricList.get(0);
+ Assert.assertEquals(Status.FAILED, actualMetric.getProgress().getStatus());
+ }
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
index bf47d1c..d56365b 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
@@ -93,7 +94,7 @@ public class RetryingMetaStoreClient implements InvocationHandler {
String msUri = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS);
localMetaStore = (msUri == null) || msUri.trim().isEmpty();
- reloginExpiringKeytabUser();
+ SecurityUtils.reloginExpiringKeytabUser();
this.base = JavaUtils.newInstance(msClientClass, constructorArgTypes, constructorArgs);
@@ -174,7 +175,7 @@ public class RetryingMetaStoreClient implements InvocationHandler {
while (true) {
try {
- reloginExpiringKeytabUser();
+ SecurityUtils.reloginExpiringKeytabUser();
if (allowReconnect) {
if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
@@ -325,28 +326,4 @@ public class RetryingMetaStoreClient implements InvocationHandler {
return shouldReconnect;
}
- /**
- * Relogin if login user is logged in using keytab
- * Relogin is actually done by ugi code only if sufficient time has passed
- * A no-op if kerberos security is not enabled
- * @throws MetaException
- */
- private void reloginExpiringKeytabUser() throws MetaException {
- if(!UserGroupInformation.isSecurityEnabled()){
- return;
- }
- try {
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry
- //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x)
- if(ugi.isFromKeytab()){
- ugi.checkTGTAndReloginFromKeytab();
- }
- } catch (IOException e) {
- String msg = "Error doing relogin using keytab " + e.getMessage();
- LOG.error(msg, e);
- throw new MetaException(msg);
- }
- }
-
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
index bae1ec3..5ce340f 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.utils;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier;
import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector;
import org.apache.hadoop.io.Text;
@@ -267,4 +268,28 @@ public class SecurityUtils {
sslSocket.setSSLParameters(sslParams);
return new TSocket(sslSocket);
}
+
+ /**
+ * Relogin if login user is logged in using keytab
+ * Relogin is actually done by ugi code only if sufficient time has passed
+ * A no-op if kerberos security is not enabled
+ * @throws MetaException
+ */
+ public static void reloginExpiringKeytabUser() throws MetaException {
+ if(!UserGroupInformation.isSecurityEnabled()){
+ return;
+ }
+ try {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry
+ //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x)
+ if(ugi.isFromKeytab()){
+ ugi.checkTGTAndReloginFromKeytab();
+ }
+ } catch (IOException e) {
+ String msg = "Error doing relogin using keytab " + e.getMessage();
+ LOG.error(msg, e);
+ throw new MetaException(msg);
+ }
+ }
}