You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/04/06 19:10:08 UTC

nifi git commit: NIFI-3528 Added support for keytab/principal to Kafka 0.10 processors

Repository: nifi
Updated Branches:
  refs/heads/master 556f309df -> 614fa6a6c


NIFI-3528 Added support for keytab/principal to Kafka 0.10 processors

This closes #1606.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/614fa6a6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/614fa6a6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/614fa6a6

Branch: refs/heads/master
Commit: 614fa6a6c4272d07c7db6273c15c97c215ad8e7e
Parents: 556f309
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Mar 21 15:48:53 2017 +0100
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Apr 6 15:09:49 2017 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/KafkaProcessorUtils.java       | 64 +++++++++++++++++++-
 .../kafka/pubsub/ConsumeKafkaTest.java          | 25 ++++++++
 2 files changed, 88 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/614fa6a6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index cbe2e24..de28995 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -96,6 +96,24 @@ final class KafkaProcessorUtils {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(false)
             .build();
+    static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
+            .name("sasl.kerberos.principal")
+            .displayName("Kerberos Principal")
+            .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
+            .name("sasl.kerberos.keytab")
+            .displayName("Kerberos Keytab")
+            .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("ssl.context.service")
             .displayName("SSL Context Service")
@@ -109,6 +127,8 @@ final class KafkaProcessorUtils {
                 BOOTSTRAP_SERVERS,
                 SECURITY_PROTOCOL,
                 KERBEROS_PRINCIPLE,
+                USER_PRINCIPAL,
+                USER_KEYTAB,
                 SSL_CONTEXT_SERVICE
         );
     }
@@ -131,6 +151,16 @@ final class KafkaProcessorUtils {
                                 + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
                         .build());
             }
+
+            String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue();
+            String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue();
+            if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
+                    || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
+                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+                        .explanation("Both <" + USER_KEYTAB.getDisplayName()  + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+                                + "must be set.")
+                        .build());
+            }
         }
 
         //If SSL or SASL_SSL then CS must be set.
@@ -233,7 +263,7 @@ final class KafkaProcessorUtils {
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
                     : context.getProperty(propertyDescriptor).getValue();
 
-            if (propertyValue != null) {
+            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
                 // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
                 // or the standard NiFi time period such as "5 secs"
                 if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
@@ -245,6 +275,38 @@ final class KafkaProcessorUtils {
                 }
             }
         }
+
+        String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+            setJaasConfig(mapToPopulate, context);
+        }
+    }
+
+    /**
+     * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
+     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
+     * <br />
+     * It expects something with the following format: <br />
+     * <br />
+     * &lt;LoginModuleClass&gt; &lt;ControlFlag&gt; *(&lt;OptionName&gt;=&lt;OptionValue&gt;); <br />
+     * ControlFlag = required / requisite / sufficient / optional
+     *
+     * @param mapToPopulate Map of configuration properties
+     * @param context Context
+     */
+    private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
+        String keytab = context.getProperty(USER_KEYTAB).getValue();
+        String principal = context.getProperty(USER_PRINCIPAL).getValue();
+        String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue();
+        if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
+            mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+                    + "useTicketCache=false "
+                    + "renewTicket=true "
+                    + "serviceName=\"" + serviceName + "\" "
+                    + "useKeyTab=true "
+                    + "keyTab=\"" + keytab + "\" "
+                    + "principal=\"" + principal + "\";");
+        }
     }
 
     private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/614fa6a6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 9a74c44..9b380d5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -164,4 +164,29 @@ public class ConsumeKafkaTest {
         verifyNoMoreInteractions(mockLease);
     }
 
+    @Test
+    public void testJaasConfiguration() throws Exception {
+        ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.assertValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.assertValid();
+    }
+
 }