You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/12 08:14:29 UTC

kafka git commit: KAFKA-3077: Enable KafkaLog4jAppender to work with SASL enabled brokers

Repository: kafka
Updated Branches:
  refs/heads/trunk c9114488b -> 2adeb214b


KAFKA-3077: Enable KafkaLog4jAppender to work with SASL enabled brokers

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #740 from SinghAsDev/KAFKA-3077


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2adeb214
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2adeb214
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2adeb214

Branch: refs/heads/trunk
Commit: 2adeb214b1e366e36deef045f8049406f7b3773d
Parents: c911448
Author: Ashish Singh <as...@cloudera.com>
Authored: Mon Jan 11 23:14:14 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jan 11 23:14:14 2016 -0800

----------------------------------------------------------------------
 .../kafka/log4jappender/KafkaLog4jAppender.java | 42 +++++++++++++++++++-
 1 file changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2adeb214/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index dbbee3c..5759105 100644
--- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.helpers.LogLog;
@@ -51,6 +52,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
     private static final String SSL_KEYSTORE_TYPE = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
     private static final String SSL_KEYSTORE_LOCATION = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
     private static final String SSL_KEYSTORE_PASSWORD = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+    private static final String SASL_KERBEROS_SERVICE_NAME = SaslConfigs.SASL_KERBEROS_SERVICE_NAME;
 
     private String brokerList = null;
     private String topic = null;
@@ -61,6 +63,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
     private String sslKeystoreType = null;
     private String sslKeystoreLocation = null;
     private String sslKeystorePassword = null;
+    private String saslKerberosServiceName = null;
+    private String clientJaasConfPath = null;
+    private String kerb5ConfPath = null;
 
     private int retries = 0;
     private int requiredNumAcks = Integer.MAX_VALUE;
@@ -155,6 +160,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         this.sslKeystoreLocation = sslKeystoreLocation;
     }
 
+    public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+        this.saslKerberosServiceName = saslKerberosServiceName;
+    }
+
+    public void setClientJaasConfPath(String clientJaasConfPath) {
+        this.clientJaasConfPath = clientJaasConfPath;
+    }
+
+    public void setKerb5ConfPath(String kerb5ConfPath) {
+        this.kerb5ConfPath = kerb5ConfPath;
+    }
+
     public String getSslKeystoreLocation() {
         return sslKeystoreLocation;
     }
@@ -167,6 +184,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         return sslKeystorePassword;
     }
 
+    public String getSaslKerberosServiceName() {
+        return saslKerberosServiceName;
+    }
+
+    public String getClientJaasConfPath() {
+        return clientJaasConfPath;
+    }
+
+    public String getKerb5ConfPath() {
+        return kerb5ConfPath;
+    }
+
     @Override
     public void activateOptions() {
         // check for config parameter validity
@@ -183,9 +212,11 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
             props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
         if (retries > 0)
             props.put(RETRIES_CONFIG, retries);
-        if (securityProtocol != null && sslTruststoreLocation != null &&
-            sslTruststorePassword != null) {
+        if (securityProtocol != null) {
             props.put(SECURITY_PROTOCOL, securityProtocol);
+        }
+        if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null &&
+            sslTruststorePassword != null) {
             props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation);
             props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword);
 
@@ -196,6 +227,13 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
                 props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword);
             }
         }
+        if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
+            props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
+            System.setProperty("java.security.auth.login.config", clientJaasConfPath);
+            if (kerb5ConfPath != null) {
+                System.setProperty("java.security.krb5.conf", kerb5ConfPath);
+            }
+        }
 
         props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");