You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by rm...@apache.org on 2020/10/12 16:52:20 UTC

[ranger] branch master updated: RANGER-3001:Update Ranger KafkaClient to use Kafka AdminClient API instead of Zookeeper

This is an automated email from the ASF dual-hosted git repository.

rmani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d8ff16  RANGER-3001:Update Ranger KafkaClient to use Kafka AdminClient API instead of Zookeeper
1d8ff16 is described below

commit 1d8ff1603674b967a4372e8963136945432e98f5
Author: Ramesh Mani <rm...@cloudera.com>
AuthorDate: Mon Oct 12 09:17:34 2020 -0700

    RANGER-3001:Update Ranger KafkaClient to use Kafka AdminClient API instead of Zookeeper
---
 .../services/kafka/client/ServiceKafkaClient.java  | 119 +++++++++++++++------
 .../kafka/client/ServiceKafkaConnectionMgr.java    |  67 ++++++++++--
 2 files changed, 147 insertions(+), 39 deletions(-)

diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 91a7d27..1144081 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -19,25 +19,25 @@
 
 package org.apache.ranger.services.kafka.client;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.log4j.Logger;
 import org.apache.ranger.plugin.client.BaseClient;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
 import org.apache.ranger.plugin.util.TimedEventUtil;
 
-import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
-import scala.Option;
-import scala.collection.Iterator;
-
 public class ServiceKafkaClient {
 	private static final Logger LOG = Logger.getLogger(ServiceKafkaClient.class);
 
@@ -45,21 +45,32 @@ public class ServiceKafkaClient {
 		TOPIC
 	}
 
-	String serviceName = null;
-	String zookeeperConnect = null;
+	String serviceName;
+	Map<String,String > configs;
 	private static final String errMessage = " You can still save the repository and start creating "
 			+ "policies, but you would not be able to use autocomplete for "
 			+ "resource names. Check server logs for more info.";
 
-	private static final String TOPIC_KEY = "topic";
-	private static final long LOOKUP_TIMEOUT_SEC = 5;
-
-	public ServiceKafkaClient(String serviceName, String zookeeperConnect) {
+	private static final String TOPIC_KEY				= "topic";
+	private static final long   LOOKUP_TIMEOUT_SEC		= 5;
+	private static final String KEY_SASL_MECHANISM		= "sasl.mechanism";
+	private static final String KEY_SASL_JAAS_CONFIG	= "sasl.jaas.config";
+	private static final String KEY_KAFKA_KEYTAB		= "kafka.keytab";
+	private static final String KEY_KAFKA_PRINCIPAL		= "kafka.principal";
+	private static final String JAAS_KRB5_MODULE		= "com.sun.security.auth.module.Krb5LoginModule required";
+	private static final String JAAS_USE_KEYTAB			= "useKeyTab=true";
+	private static final String JAAS_KEYTAB				= "keyTab=\"";
+	private static final String JAAS_STOKE_KEY			= "storeKey=true";
+	private static final String JAAS_SERVICE_NAME		= "serviceName=kafka";
+	private static final String JAAS_USER_TICKET_CACHE	= "useTicketCache=false";
+	private static final String JAAS_PRINCIPAL			= "principal=\"";
+
+	public ServiceKafkaClient(String serviceName, Map<String,String> configs) {
 		this.serviceName = serviceName;
-		this.zookeeperConnect = zookeeperConnect;
+		this.configs = configs;
 	}
 
-	public Map<String, Object> connectionTest() throws Exception {
+	public Map<String, Object> connectionTest() {
 		String errMsg = errMessage;
 		Map<String, Object> responseData = new HashMap<String, Object>();
 		try {
@@ -69,7 +80,7 @@ public class ServiceKafkaClient {
 			String successMsg = "ConnectionTest Successful";
 			BaseClient.generateResponseDataMap(true, successMsg,
 					successMsg, null, null, responseData);
-		} catch (IOException e) {
+		} catch (Exception e) {
 			LOG.error("Error connecting to Kafka. kafkaClient=" + this, e);
 			String failureMsg = "Unable to connect to Kafka instance."
 					+ e.getMessage();
@@ -84,22 +95,41 @@ public class ServiceKafkaClient {
 
 		int sessionTimeout = 5000;
 		int connectionTimeout = 10000;
-		ZooKeeperClient zookeeperClient = new ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout,
-				1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty());
-		try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) {
-			Iterator<String> iter = kafkaZkClient.getAllTopicsInCluster().iterator();
-			while (iter.hasNext()) {
-				String topic = iter.next();
-				if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) {
-					ret.add(topic);
+		AdminClient adminClient = null;
+
+		try {
+			Properties props = new Properties();
+			props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+			props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
+			props.put(KEY_SASL_MECHANISM, configs.get(KEY_SASL_MECHANISM));
+			props.put(KEY_SASL_JAAS_CONFIG, getJAASConfig(configs));
+			props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, getIntProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, sessionTimeout));
+			props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getIntProperty(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connectionTimeout));
+			adminClient = KafkaAdminClient.create(props);
+			ListTopicsResult listTopicsResult = adminClient.listTopics();
+			if (listTopicsResult != null) {
+				Collection<TopicListing> topicListings = listTopicsResult.listings().get();
+				for (TopicListing topicListing : topicListings) {
+					String topicName = topicListing.name();
+					if (ignoreTopicList == null || !ignoreTopicList.contains(topicName)) {
+						ret.add(topicName);
+					}
 				}
 			}
+		} catch (Exception e) {
+			throw e;
+		} finally {
+			if (adminClient != null) {
+				adminClient.close();
+			}
 		}
 		return ret;
 	}
 
+
+
 	/**
-	 * @param serviceName
+	 * @param context
 	 * @param context
 	 * @return
 	 */
@@ -124,11 +154,11 @@ public class ServiceKafkaClient {
 				topicList = resourceMap.get(TOPIC_KEY);
 			}
 			switch (resource.trim().toLowerCase()) {
-			case TOPIC_KEY:
-				lookupResource = RESOURCE_TYPE.TOPIC;
-				break;
-			default:
-				break;
+				case TOPIC_KEY:
+					lookupResource = RESOURCE_TYPE.TOPIC;
+					break;
+				default:
+					break;
 			}
 		}
 
@@ -182,7 +212,34 @@ public class ServiceKafkaClient {
 	@Override
 	public String toString() {
 		return "ServiceKafkaClient [serviceName=" + serviceName
-				+ ", zookeeperConnect=" + zookeeperConnect + "]";
+				+ ", configs=" + configs + "]";
+	}
+
+	private Integer getIntProperty(String key, int defaultValue) {
+		if (key == null) {
+			return defaultValue;
+		}
+		String rtrnVal = configs.get(key);
+		if (rtrnVal == null) {
+			return defaultValue;
+		}
+		return Integer.valueOf(rtrnVal);
+	}
+
+	private String getJAASConfig(Map<String,String> configs){
+		String jaasConfig =  new StringBuilder()
+				.append(JAAS_KRB5_MODULE).append(" ")
+				.append(JAAS_USE_KEYTAB).append(" ")
+				.append(JAAS_KEYTAB).append(configs.get(KEY_KAFKA_KEYTAB)).append("\"").append(" ")
+				.append(JAAS_STOKE_KEY).append(" ")
+				.append(JAAS_USER_TICKET_CACHE).append(" ")
+				.append(JAAS_SERVICE_NAME).append(" ")
+				.append(JAAS_PRINCIPAL).append(configs.get(KEY_KAFKA_PRINCIPAL)).append("\";")
+				.toString();
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("KafkaClient JAAS: " + jaasConfig);
+		}
+		return jaasConfig;
 	}
 
 }
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
index 9e0d6b4..60c55cc 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
@@ -19,20 +19,25 @@
 
 package org.apache.ranger.services.kafka.client;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import java.util.Map;
 
 public class ServiceKafkaConnectionMgr {
+	private static final String SEPARATOR			= ",";
+	private static final String KEY_SASL_MECHANISM	= "sasl.mechanism";
+	private static final String KEY_KAFKA_KEYTAB    = "kafka.keytab";
+	private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal";
 
 	static public ServiceKafkaClient getKafkaClient(String serviceName,
-			Map<String, String> configs) throws Exception {
-		String zookeeperConnect = configs.get("zookeeper.connect");
-		if (zookeeperConnect != null) {
-			ServiceKafkaClient serviceKafkaClient = new ServiceKafkaClient(
-					serviceName, zookeeperConnect);
-			return serviceKafkaClient;
+													Map<String, String> configs) throws Exception {
+		String error = getServiceConfigValidationErrors(configs);
+		if (StringUtils.isNotBlank(error)){
+			error =  "JAAS configuration missing or not correct in Ranger Kafka Service..." + error;
+			throw new Exception(error);
 		}
-		throw new Exception("Required properties are not set for "
-				+ serviceName + ". URL or Zookeeper information not provided.");
+		ServiceKafkaClient serviceKafkaClient = new ServiceKafkaClient(serviceName, configs);
+		return serviceKafkaClient;
 	}
 
 	/**
@@ -47,4 +52,50 @@ public class ServiceKafkaConnectionMgr {
 		return serviceKafkaClient.connectionTest();
 	}
 
+	private static String  getServiceConfigValidationErrors(Map<String, String> configs) {
+		StringBuilder ret = new StringBuilder();
+
+		String bootstrap_servers = configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+		String security_protocol = configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+		String sasl_mechanism = configs.get(KEY_SASL_MECHANISM);
+		String kafka_keytab = configs.get(KEY_KAFKA_KEYTAB);
+		String kafka_principal = configs.get(KEY_KAFKA_PRINCIPAL);
+
+		if (StringUtils.isEmpty(bootstrap_servers)) {
+			ret.append(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+		}
+
+		if (StringUtils.isEmpty(security_protocol)) {
+			if (StringUtils.isNotBlank(ret.toString())) {
+				ret.append(SEPARATOR).append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+			} else {
+				ret.append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+			}
+		}
+
+		if (StringUtils.isEmpty(sasl_mechanism)) {
+			if (StringUtils.isNotBlank(ret.toString())) {
+				ret.append(SEPARATOR).append(KEY_SASL_MECHANISM);
+			} else {
+				ret.append(KEY_SASL_MECHANISM);
+			}
+		}
+
+		if (StringUtils.isEmpty(kafka_keytab)) {
+			if (StringUtils.isNotBlank(ret.toString())) {
+				ret.append(SEPARATOR).append(KEY_KAFKA_KEYTAB);
+			} else {
+				ret.append(KEY_KAFKA_KEYTAB);
+			}
+		}
+
+		if (StringUtils.isEmpty(kafka_principal)) {
+			if (StringUtils.isNotBlank(ret.toString())) {
+				ret.append(SEPARATOR).append(KEY_KAFKA_PRINCIPAL);
+			} else {
+				ret.append(KEY_KAFKA_PRINCIPAL);
+			}
+		}
+		return ret.toString();
+	}
 }