You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/12 08:14:29 UTC
kafka git commit: KAFKA-3077: Enable KafkaLog4jAppender to work with
SASL enabled brokers
Repository: kafka
Updated Branches:
refs/heads/trunk c9114488b -> 2adeb214b
KAFKA-3077: Enable KafkaLog4jAppender to work with SASL enabled brokers
Author: Ashish Singh <as...@cloudera.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #740 from SinghAsDev/KAFKA-3077
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2adeb214
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2adeb214
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2adeb214
Branch: refs/heads/trunk
Commit: 2adeb214b1e366e36deef045f8049406f7b3773d
Parents: c911448
Author: Ashish Singh <as...@cloudera.com>
Authored: Mon Jan 11 23:14:14 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jan 11 23:14:14 2016 -0800
----------------------------------------------------------------------
.../kafka/log4jappender/KafkaLog4jAppender.java | 42 +++++++++++++++++++-
1 file changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2adeb214/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index dbbee3c..5759105 100644
--- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
@@ -51,6 +52,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private static final String SSL_KEYSTORE_TYPE = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
private static final String SSL_KEYSTORE_LOCATION = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
private static final String SSL_KEYSTORE_PASSWORD = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+ private static final String SASL_KERBEROS_SERVICE_NAME = SaslConfigs.SASL_KERBEROS_SERVICE_NAME;
private String brokerList = null;
private String topic = null;
@@ -61,6 +63,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private String sslKeystoreType = null;
private String sslKeystoreLocation = null;
private String sslKeystorePassword = null;
+ private String saslKerberosServiceName = null;
+ private String clientJaasConfPath = null;
+ private String kerb5ConfPath = null;
private int retries = 0;
private int requiredNumAcks = Integer.MAX_VALUE;
@@ -155,6 +160,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
this.sslKeystoreLocation = sslKeystoreLocation;
}
+ public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+ this.saslKerberosServiceName = saslKerberosServiceName;
+ }
+
+ public void setClientJaasConfPath(String clientJaasConfPath) {
+ this.clientJaasConfPath = clientJaasConfPath;
+ }
+
+ public void setKerb5ConfPath(String kerb5ConfPath) {
+ this.kerb5ConfPath = kerb5ConfPath;
+ }
+
public String getSslKeystoreLocation() {
return sslKeystoreLocation;
}
@@ -167,6 +184,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
return sslKeystorePassword;
}
+ public String getSaslKerberosServiceName() {
+ return saslKerberosServiceName;
+ }
+
+ public String getClientJaasConfPath() {
+ return clientJaasConfPath;
+ }
+
+ public String getKerb5ConfPath() {
+ return kerb5ConfPath;
+ }
+
@Override
public void activateOptions() {
// check for config parameter validity
@@ -183,9 +212,11 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
if (retries > 0)
props.put(RETRIES_CONFIG, retries);
- if (securityProtocol != null && sslTruststoreLocation != null &&
- sslTruststorePassword != null) {
+ if (securityProtocol != null) {
props.put(SECURITY_PROTOCOL, securityProtocol);
+ }
+ if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null &&
+ sslTruststorePassword != null) {
props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation);
props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword);
@@ -196,6 +227,13 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword);
}
}
+ if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
+ props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
+ System.setProperty("java.security.auth.login.config", clientJaasConfPath);
+ if (kerb5ConfPath != null) {
+ System.setProperty("java.security.krb5.conf", kerb5ConfPath);
+ }
+ }
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");