You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by rm...@apache.org on 2020/10/12 16:52:20 UTC
[ranger] branch master updated: RANGER-3001:Update Ranger
KafkaClient to use Kafka AdminClient API instead of Zookeeper
This is an automated email from the ASF dual-hosted git repository.
rmani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new 1d8ff16 RANGER-3001:Update Ranger KafkaClient to use Kafka AdminClient API instead of Zookeeper
1d8ff16 is described below
commit 1d8ff1603674b967a4372e8963136945432e98f5
Author: Ramesh Mani <rm...@cloudera.com>
AuthorDate: Mon Oct 12 09:17:34 2020 -0700
RANGER-3001:Update Ranger KafkaClient to use Kafka AdminClient API instead of Zookeeper
---
.../services/kafka/client/ServiceKafkaClient.java | 119 +++++++++++++++------
.../kafka/client/ServiceKafkaConnectionMgr.java | 67 ++++++++++--
2 files changed, 147 insertions(+), 39 deletions(-)
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 91a7d27..1144081 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -19,25 +19,25 @@
package org.apache.ranger.services.kafka.client;
-import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.log4j.Logger;
import org.apache.ranger.plugin.client.BaseClient;
import org.apache.ranger.plugin.service.ResourceLookupContext;
import org.apache.ranger.plugin.util.TimedEventUtil;
-import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
-import scala.Option;
-import scala.collection.Iterator;
-
public class ServiceKafkaClient {
private static final Logger LOG = Logger.getLogger(ServiceKafkaClient.class);
@@ -45,21 +45,32 @@ public class ServiceKafkaClient {
TOPIC
}
- String serviceName = null;
- String zookeeperConnect = null;
+ String serviceName;
+ Map<String,String > configs;
private static final String errMessage = " You can still save the repository and start creating "
+ "policies, but you would not be able to use autocomplete for "
+ "resource names. Check server logs for more info.";
- private static final String TOPIC_KEY = "topic";
- private static final long LOOKUP_TIMEOUT_SEC = 5;
-
- public ServiceKafkaClient(String serviceName, String zookeeperConnect) {
+ private static final String TOPIC_KEY = "topic";
+ private static final long LOOKUP_TIMEOUT_SEC = 5;
+ private static final String KEY_SASL_MECHANISM = "sasl.mechanism";
+ private static final String KEY_SASL_JAAS_CONFIG = "sasl.jaas.config";
+ private static final String KEY_KAFKA_KEYTAB = "kafka.keytab";
+ private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal";
+ private static final String JAAS_KRB5_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
+ private static final String JAAS_USE_KEYTAB = "useKeyTab=true";
+ private static final String JAAS_KEYTAB = "keyTab=\"";
+ private static final String JAAS_STOKE_KEY = "storeKey=true";
+ private static final String JAAS_SERVICE_NAME = "serviceName=kafka";
+ private static final String JAAS_USER_TICKET_CACHE = "useTicketCache=false";
+ private static final String JAAS_PRINCIPAL = "principal=\"";
+
+ public ServiceKafkaClient(String serviceName, Map<String,String> configs) {
this.serviceName = serviceName;
- this.zookeeperConnect = zookeeperConnect;
+ this.configs = configs;
}
- public Map<String, Object> connectionTest() throws Exception {
+ public Map<String, Object> connectionTest() {
String errMsg = errMessage;
Map<String, Object> responseData = new HashMap<String, Object>();
try {
@@ -69,7 +80,7 @@ public class ServiceKafkaClient {
String successMsg = "ConnectionTest Successful";
BaseClient.generateResponseDataMap(true, successMsg,
successMsg, null, null, responseData);
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Error connecting to Kafka. kafkaClient=" + this, e);
String failureMsg = "Unable to connect to Kafka instance."
+ e.getMessage();
@@ -84,22 +95,41 @@ public class ServiceKafkaClient {
int sessionTimeout = 5000;
int connectionTimeout = 10000;
- ZooKeeperClient zookeeperClient = new ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout,
- 1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty());
- try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) {
- Iterator<String> iter = kafkaZkClient.getAllTopicsInCluster().iterator();
- while (iter.hasNext()) {
- String topic = iter.next();
- if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) {
- ret.add(topic);
+ AdminClient adminClient = null;
+
+ try {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
+ props.put(KEY_SASL_MECHANISM, configs.get(KEY_SASL_MECHANISM));
+ props.put(KEY_SASL_JAAS_CONFIG, getJAASConfig(configs));
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, getIntProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, sessionTimeout));
+ props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getIntProperty(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connectionTimeout));
+ adminClient = KafkaAdminClient.create(props);
+ ListTopicsResult listTopicsResult = adminClient.listTopics();
+ if (listTopicsResult != null) {
+ Collection<TopicListing> topicListings = listTopicsResult.listings().get();
+ for (TopicListing topicListing : topicListings) {
+ String topicName = topicListing.name();
+ if (ignoreTopicList == null || !ignoreTopicList.contains(topicName)) {
+ ret.add(topicName);
+ }
}
}
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ if (adminClient != null) {
+ adminClient.close();
+ }
}
return ret;
}
+
+
/**
- * @param serviceName
+ * @param context
* @param context
* @return
*/
@@ -124,11 +154,11 @@ public class ServiceKafkaClient {
topicList = resourceMap.get(TOPIC_KEY);
}
switch (resource.trim().toLowerCase()) {
- case TOPIC_KEY:
- lookupResource = RESOURCE_TYPE.TOPIC;
- break;
- default:
- break;
+ case TOPIC_KEY:
+ lookupResource = RESOURCE_TYPE.TOPIC;
+ break;
+ default:
+ break;
}
}
@@ -182,7 +212,34 @@ public class ServiceKafkaClient {
@Override
public String toString() {
return "ServiceKafkaClient [serviceName=" + serviceName
- + ", zookeeperConnect=" + zookeeperConnect + "]";
+ + ", configs=" + configs + "]";
+ }
+
+ private Integer getIntProperty(String key, int defaultValue) {
+ if (key == null) {
+ return defaultValue;
+ }
+ String rtrnVal = configs.get(key);
+ if (rtrnVal == null) {
+ return defaultValue;
+ }
+ return Integer.valueOf(rtrnVal);
+ }
+
+ private String getJAASConfig(Map<String,String> configs){
+ String jaasConfig = new StringBuilder()
+ .append(JAAS_KRB5_MODULE).append(" ")
+ .append(JAAS_USE_KEYTAB).append(" ")
+ .append(JAAS_KEYTAB).append(configs.get(KEY_KAFKA_KEYTAB)).append("\"").append(" ")
+ .append(JAAS_STOKE_KEY).append(" ")
+ .append(JAAS_USER_TICKET_CACHE).append(" ")
+ .append(JAAS_SERVICE_NAME).append(" ")
+ .append(JAAS_PRINCIPAL).append(configs.get(KEY_KAFKA_PRINCIPAL)).append("\";")
+ .toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("KafkaClient JAAS: " + jaasConfig);
+ }
+ return jaasConfig;
}
}
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
index 9e0d6b4..60c55cc 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
@@ -19,20 +19,25 @@
package org.apache.ranger.services.kafka.client;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import java.util.Map;
public class ServiceKafkaConnectionMgr {
+ private static final String SEPARATOR = ",";
+ private static final String KEY_SASL_MECHANISM = "sasl.mechanism";
+ private static final String KEY_KAFKA_KEYTAB = "kafka.keytab";
+ private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal";
static public ServiceKafkaClient getKafkaClient(String serviceName,
- Map<String, String> configs) throws Exception {
- String zookeeperConnect = configs.get("zookeeper.connect");
- if (zookeeperConnect != null) {
- ServiceKafkaClient serviceKafkaClient = new ServiceKafkaClient(
- serviceName, zookeeperConnect);
- return serviceKafkaClient;
+ Map<String, String> configs) throws Exception {
+ String error = getServiceConfigValidationErrors(configs);
+ if (StringUtils.isNotBlank(error)){
+ error = "JAAS configuration missing or not correct in Ranger Kafka Service..." + error;
+ throw new Exception(error);
}
- throw new Exception("Required properties are not set for "
- + serviceName + ". URL or Zookeeper information not provided.");
+ ServiceKafkaClient serviceKafkaClient = new ServiceKafkaClient(serviceName, configs);
+ return serviceKafkaClient;
}
/**
@@ -47,4 +52,50 @@ public class ServiceKafkaConnectionMgr {
return serviceKafkaClient.connectionTest();
}
+ private static String getServiceConfigValidationErrors(Map<String, String> configs) {
+ StringBuilder ret = new StringBuilder();
+
+ String bootstrap_servers = configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+ String security_protocol = configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+ String sasl_mechanism = configs.get(KEY_SASL_MECHANISM);
+ String kafka_keytab = configs.get(KEY_KAFKA_KEYTAB);
+ String kafka_principal = configs.get(KEY_KAFKA_PRINCIPAL);
+
+ if (StringUtils.isEmpty(bootstrap_servers)) {
+ ret.append(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+ }
+
+ if (StringUtils.isEmpty(security_protocol)) {
+ if (StringUtils.isNotBlank(ret.toString())) {
+ ret.append(SEPARATOR).append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+ } else {
+ ret.append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+ }
+ }
+
+ if (StringUtils.isEmpty(sasl_mechanism)) {
+ if (StringUtils.isNotBlank(ret.toString())) {
+ ret.append(SEPARATOR).append(KEY_SASL_MECHANISM);
+ } else {
+ ret.append(KEY_SASL_MECHANISM);
+ }
+ }
+
+ if (StringUtils.isEmpty(kafka_keytab)) {
+ if (StringUtils.isNotBlank(ret.toString())) {
+ ret.append(SEPARATOR).append(KEY_KAFKA_KEYTAB);
+ } else {
+ ret.append(KEY_KAFKA_KEYTAB);
+ }
+ }
+
+ if (StringUtils.isEmpty(kafka_principal)) {
+ if (StringUtils.isNotBlank(ret.toString())) {
+ ret.append(SEPARATOR).append(KEY_KAFKA_PRINCIPAL);
+ } else {
+ ret.append(KEY_KAFKA_PRINCIPAL);
+ }
+ }
+ return ret.toString();
+ }
}