You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/10/15 18:06:21 UTC
[nifi] branch master updated: NIFI-4820 This closes #3813.
Improving security configuration for Kafka 2.0 processors
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d570fe8 NIFI-4820 This closes #3813. Improving security configuration for Kafka 2.0 processors
d570fe8 is described below
commit d570fe81543b706acf9a797fb05fac68a384b3f4
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Mon Oct 14 14:48:49 2019 -0400
NIFI-4820 This closes #3813. Improving security configuration for Kafka 2.0 processors
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../kafka/pubsub/ConsumeKafkaRecord_2_0.java | 4 +
.../kafka/pubsub/KafkaProcessorUtils.java | 159 ++++++++++++++++++---
.../kafka/pubsub/PublishKafkaRecord_2_0.java | 4 +
.../kafka/pubsub/TestConsumeKafkaRecord_2_0.java | 56 +++++++-
4 files changed, 203 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index fb31e32..7f5c75a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -232,10 +232,14 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
descriptors.add(RECORD_WRITER);
descriptors.add(HONOR_TRANSACTIONS);
descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+ descriptors.add(KafkaProcessorUtils.SASL_MECHANISM);
descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
+ descriptors.add(KafkaProcessorUtils.USERNAME);
+ descriptors.add(KafkaProcessorUtils.PASSWORD);
+ descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(GROUP_ID);
descriptors.add(AUTO_OFFSET_RESET);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index bec65d0..5c2fa68 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -16,20 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -52,6 +38,20 @@ import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
final class KafkaProcessorUtils {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
@@ -69,11 +69,26 @@ final class KafkaProcessorUtils {
static final String KAFKA_OFFSET = "kafka.offset";
static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
+
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
+ static final String GSSAPI_VALUE = "GSSAPI";
+ static final AllowableValue SASL_MECHANISM_GSSAPI = new AllowableValue(GSSAPI_VALUE, GSSAPI_VALUE,
+ "The mechanism for authentication via Kerberos. The principal and keytab must be provided to the processor " +
+ "by using a Keytab Credential service, or by specifying the properties directly in the processor.");
+
+ static final String PLAIN_VALUE = "PLAIN";
+ static final AllowableValue SASL_MECHANISM_PLAIN = new AllowableValue(PLAIN_VALUE, PLAIN_VALUE,
+ "The mechanism for authentication via username and password. The username and password properties must " +
+ "be populated when using this mechanism.");
+
+ static final String SCRAM_SHA256_VALUE = "SCRAM-SHA-256";
+ static final AllowableValue SASL_MECHANISM_SCRAM = new AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge Response Authentication Mechanism. " +
+ "The username and password properties must be set when using this mechanism.");
+
static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
.name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.displayName("Kafka Brokers")
@@ -92,6 +107,15 @@ final class KafkaProcessorUtils {
.allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
.defaultValue(SEC_PLAINTEXT.getValue())
.build();
+ static final PropertyDescriptor SASL_MECHANISM = new PropertyDescriptor.Builder()
+ .name("sasl.mechanism")
+ .displayName("SASL Mechanism")
+ .description("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, SASL_MECHANISM_SCRAM)
+ .defaultValue(GSSAPI_VALUE)
+ .build();
static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
.name("sasl.kerberos.service.name")
.displayName("Kerberos Service Name")
@@ -121,6 +145,31 @@ final class KafkaProcessorUtils {
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+ .name("sasl.username")
+ .displayName("Username")
+ .description("The username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+ static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+ .name("sasl.password")
+ .displayName("Password")
+ .description("The password for the given username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+ static final PropertyDescriptor TOKEN_AUTH = new PropertyDescriptor.Builder()
+ .name("sasl.token.auth")
+ .displayName("Token Auth")
+ .description("When " + SASL_MECHANISM.getDisplayName() + " is " + SCRAM_SHA256_VALUE + ", this property indicates if token authentication should be used.")
+ .required(false)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
.displayName("SSL Context Service")
@@ -140,10 +189,14 @@ final class KafkaProcessorUtils {
return Arrays.asList(
BOOTSTRAP_SERVERS,
SECURITY_PROTOCOL,
+ SASL_MECHANISM,
JAAS_SERVICE_NAME,
KERBEROS_CREDENTIALS_SERVICE,
USER_PRINCIPAL,
USER_KEYTAB,
+ USERNAME,
+ PASSWORD,
+ TOKEN_AUTH,
SSL_CONTEXT_SERVICE
);
}
@@ -151,7 +204,8 @@ final class KafkaProcessorUtils {
static Collection<ValidationResult> validateCommonProperties(final ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>();
- String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+ final String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+ final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue();
final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
@@ -185,9 +239,10 @@ final class KafkaProcessorUtils {
.build());
}
- // validates that if one of SASL (Kerberos) option is selected for
- // security protocol, then Kerberos principal is provided as well
- if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+ // validates that if the SASL mechanism is GSSAPI (kerberos) AND one of the SASL options is selected
+ // for security protocol, then Kerberos principal is provided as well
+ if (SASL_MECHANISM_GSSAPI.getValue().equals(saslMechanism)
+ && (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol))) {
String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
if (jaasServiceName == null || jaasServiceName.trim().length() == 0) {
results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false)
@@ -207,6 +262,29 @@ final class KafkaProcessorUtils {
}
}
+ // validate that if SASL Mechanism is PLAIN or SCRAM, then username and password are both provided
+ if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism) || SASL_MECHANISM_SCRAM.getValue().equals(saslMechanism)) {
+ final String username = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+ if (StringUtils.isBlank(username)) {
+ results.add(new ValidationResult.Builder()
+ .subject(USERNAME.getDisplayName())
+ .valid(false)
+ .explanation("A username is required when " + SASL_MECHANISM.getDisplayName()
+ + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+ .build());
+ }
+
+ final String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ if (StringUtils.isBlank(password)) {
+ results.add(new ValidationResult.Builder()
+ .subject(PASSWORD.getDisplayName())
+ .valid(false)
+ .explanation("A password is required when " + SASL_MECHANISM.getDisplayName()
+ + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+ .build());
+ }
+ }
+
// If SSL or SASL_SSL then SSLContext Controller Service must be set.
final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
@@ -356,6 +434,23 @@ final class KafkaProcessorUtils {
* @param context Context
*/
private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
+ final String saslMechanism = context.getProperty(SASL_MECHANISM).getValue();
+ switch (saslMechanism) {
+ case GSSAPI_VALUE:
+ setGssApiJaasConfig(mapToPopulate, context);
+ break;
+ case PLAIN_VALUE:
+ setPlainJaasConfig(mapToPopulate, context);
+ break;
+ case SCRAM_SHA256_VALUE:
+ setScramJaasConfig(mapToPopulate, context);
+ break;
+ default:
+ throw new IllegalStateException("Unknown " + SASL_MECHANISM.getDisplayName() + ": " + saslMechanism);
+ }
+ }
+
+ private static void setGssApiJaasConfig(final Map<String, Object> mapToPopulate, final ProcessContext context) {
String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
@@ -369,7 +464,7 @@ final class KafkaProcessorUtils {
String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
- if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
+ if (StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false "
+ "renewTicket=true "
@@ -380,6 +475,32 @@ final class KafkaProcessorUtils {
}
}
+ private static void setPlainJaasConfig(final Map<String, Object> mapToPopulate, final ProcessContext context) {
+ final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+ final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+ mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ + "username=\"" + username + "\" "
+ + "password=\"" + password + "\";");
+ }
+
+ private static void setScramJaasConfig(final Map<String, Object> mapToPopulate, final ProcessContext context) {
+ final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+ final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+ final StringBuilder builder = new StringBuilder("org.apache.kafka.common.security.scram.ScramLoginModule required ")
+ .append("username=\"" + username + "\" ")
+ .append("password=\"" + password + "\"");
+
+ final Boolean tokenAuth = context.getProperty(TOKEN_AUTH).asBoolean();
+ if (tokenAuth != null && tokenAuth) {
+ builder.append(" tokenauth=\"true\"");
+ }
+
+ builder.append(";");
+ mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, builder.toString());
+ }
+
private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index 414523b..628d4ad 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -266,10 +266,14 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
properties.add(ATTRIBUTE_NAME_REGEX);
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+ properties.add(KafkaProcessorUtils.SASL_MECHANISM);
properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
properties.add(KafkaProcessorUtils.USER_KEYTAB);
+ properties.add(KafkaProcessorUtils.USERNAME);
+ properties.add(KafkaProcessorUtils.PASSWORD);
+ properties.add(KafkaProcessorUtils.TOKEN_AUTH);
properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
properties.add(MESSAGE_KEY_FIELD);
properties.add(MAX_REQUEST_SIZE);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
index dc5331d..667940f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
@@ -192,7 +192,7 @@ public class TestConsumeKafkaRecord_2_0 {
}
@Test
- public void testJaasConfiguration() throws Exception {
+ public void testJaasConfigurationWithDefaultMechanism() {
runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
@@ -213,4 +213,58 @@ public class TestConsumeKafkaRecord_2_0 {
runner.assertValid();
}
+ @Test
+ public void testJaasConfigurationWithPlainMechanism() {
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.PLAIN_VALUE);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+ runner.assertValid();
+
+ runner.removeProperty(KafkaProcessorUtils.USERNAME);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testJaasConfigurationWithScramMechanism() {
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.SCRAM_SHA256_VALUE);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+ runner.assertValid();
+
+ runner.removeProperty(KafkaProcessorUtils.USERNAME);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNonSaslSecurityProtocol() {
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_PLAINTEXT);
+ runner.assertValid();
+ }
+
}