You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/04/06 19:10:08 UTC
nifi git commit: NIFI-3528 Added support for keytab/principal to
Kafka 0.10 processors
Repository: nifi
Updated Branches:
refs/heads/master 556f309df -> 614fa6a6c
NIFI-3528 Added support for keytab/principal to Kafka 0.10 processors
This closes #1606.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/614fa6a6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/614fa6a6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/614fa6a6
Branch: refs/heads/master
Commit: 614fa6a6c4272d07c7db6273c15c97c215ad8e7e
Parents: 556f309
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Mar 21 15:48:53 2017 +0100
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Apr 6 15:09:49 2017 -0400
----------------------------------------------------------------------
.../kafka/pubsub/KafkaProcessorUtils.java | 64 +++++++++++++++++++-
.../kafka/pubsub/ConsumeKafkaTest.java | 25 ++++++++
2 files changed, 88 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/614fa6a6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index cbe2e24..de28995 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -96,6 +96,24 @@ final class KafkaProcessorUtils {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false)
.build();
+ static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
+ .name("sasl.kerberos.principal")
+ .displayName("Kerberos Principal")
+ .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+ + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
+ .name("sasl.kerberos.keytab")
+ .displayName("Kerberos Keytab")
+ .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+ + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+ .required(false)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
.displayName("SSL Context Service")
@@ -109,6 +127,8 @@ final class KafkaProcessorUtils {
BOOTSTRAP_SERVERS,
SECURITY_PROTOCOL,
KERBEROS_PRINCIPLE,
+ USER_PRINCIPAL,
+ USER_KEYTAB,
SSL_CONTEXT_SERVICE
);
}
@@ -131,6 +151,16 @@ final class KafkaProcessorUtils {
+ SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
.build());
}
+
+ String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue();
+ String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue();
+ if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
+ || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
+ results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+ .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+ + "must be set.")
+ .build());
+ }
}
//If SSL or SASL_SSL then CS must be set.
@@ -233,7 +263,7 @@ final class KafkaProcessorUtils {
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
- if (propertyValue != null) {
+ if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
// or the standard NiFi time period such as "5 secs"
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
@@ -245,6 +275,38 @@ final class KafkaProcessorUtils {
}
}
}
+
+ String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+ if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+ setJaasConfig(mapToPopulate, context);
+ }
+ }
+
+ /**
+ * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
+ * <br />
+ * It expects something with the following format: <br />
+ * <br />
+ * <LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>); <br />
+ * ControlFlag = required / requisite / sufficient / optional
+ *
+ * @param mapToPopulate Map of configuration properties
+ * @param context Context
+ */
+ private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
+ String keytab = context.getProperty(USER_KEYTAB).getValue();
+ String principal = context.getProperty(USER_PRINCIPAL).getValue();
+ String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue();
+ if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
+ mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ + "useTicketCache=false "
+ + "renewTicket=true "
+ + "serviceName=\"" + serviceName + "\" "
+ + "useKeyTab=true "
+ + "keyTab=\"" + keytab + "\" "
+ + "principal=\"" + principal + "\";");
+ }
}
private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/614fa6a6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 9a74c44..9b380d5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -164,4 +164,29 @@ public class ConsumeKafkaTest {
verifyNoMoreInteractions(mockLease);
}
+ @Test
+ public void testJaasConfiguration() throws Exception {
+ ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
+ TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
+ runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
+
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+ runner.assertValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+ runner.assertValid();
+ }
+
}