You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/12/01 16:30:07 UTC
[nifi] branch main updated: NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components
This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c3b0e1a790 NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components
c3b0e1a790 is described below
commit c3b0e1a790832cdbb9bc5bec336dc27859313c0e
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Dec 1 08:15:20 2022 -0600
NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components
This closes #6743
Signed-off-by: Paul Grey <gr...@apache.org>
---
.../login/DelegatingLoginConfigProvider.java | 2 +-
.../nifi/kafka/shared/property/SaslMechanism.java | 10 +++
.../StandardKafkaPropertyNameProvider.java | 16 +++--
.../provider/StandardKafkaPropertyProvider.java | 2 +-
.../login/DelegatingLoginConfigProviderTest.java | 69 +++++++++++++++++++
.../StandardKafkaPropertyProviderTest.java | 80 ++++++++++++++++++++++
6 files changed, 171 insertions(+), 8 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
index c9b81bc595..2be8274606 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
@@ -47,7 +47,7 @@ public class DelegatingLoginConfigProvider implements LoginConfigProvider {
@Override
public String getConfiguration(final PropertyContext context) {
final String saslMechanismProperty = context.getProperty(KafkaClientComponent.SASL_MECHANISM).getValue();
- final SaslMechanism saslMechanism = SaslMechanism.valueOf(saslMechanismProperty);
+ final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(saslMechanismProperty);
final LoginConfigProvider loginConfigProvider = PROVIDERS.getOrDefault(saslMechanism, SCRAM_PROVIDER);
return loginConfigProvider.getConfiguration(context);
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
index 619daeb86a..a9da7714e3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
@@ -18,6 +18,9 @@ package org.apache.nifi.kafka.shared.property;
import org.apache.nifi.components.DescribedValue;
+import java.util.Arrays;
+import java.util.Optional;
+
/**
* Enumeration of supported Kafka SASL Mechanisms
*/
@@ -42,6 +45,13 @@ public enum SaslMechanism implements DescribedValue {
this.description = description;
}
+ public static SaslMechanism getSaslMechanism(final String value) {
+ final Optional<SaslMechanism> foundSaslMechanism = Arrays.stream(SaslMechanism.values())
+ .filter(saslMechanism -> saslMechanism.getValue().equals(value))
+ .findFirst();
+ return foundSaslMechanism.orElseThrow(() -> new IllegalArgumentException(String.format("SaslMechanism value [%s] not found", value)));
+ }
+
@Override
public String getValue() {
return value;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java
index 10979716a4..2083bfc7e7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java
@@ -19,6 +19,7 @@ package org.apache.nifi.kafka.shared.property.provider;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.LinkedHashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -58,9 +59,12 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi
final Set<String> propertyNames = new LinkedHashSet<>();
for (final String propertyClassName : PROPERTY_CLASSES) {
- final Class<?> propertyClass = getClass(propertyClassName);
- final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass);
- propertyNames.addAll(classPropertyNames);
+ final Optional<Class<?>> propertyClassFound = findClass(propertyClassName);
+ if (propertyClassFound.isPresent()) {
+ final Class<?> propertyClass = propertyClassFound.get();
+ final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass);
+ propertyNames.addAll(classPropertyNames);
+ }
}
return propertyNames;
@@ -93,11 +97,11 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi
}
}
- private static Class<?> getClass(final String className) {
+ private static Optional<Class<?>> findClass(final String className) {
try {
- return Class.forName(className);
+ return Optional.of(Class.forName(className));
} catch (final ClassNotFoundException e) {
- throw new IllegalStateException("Kafka Configuration Class not found", e);
+ return Optional.empty();
}
}
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
index a9ae4d6320..fd06acb6d0 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
@@ -83,7 +83,7 @@ public class StandardKafkaPropertyProvider implements KafkaPropertyProvider {
final String loginConfig = LOGIN_CONFIG_PROVIDER.getConfiguration(context);
properties.put(SASL_JAAS_CONFIG.getProperty(), loginConfig);
- final SaslMechanism saslMechanism = SaslMechanism.valueOf(context.getProperty(SASL_MECHANISM).getValue());
+ final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(context.getProperty(SASL_MECHANISM).getValue());
if (SaslMechanism.GSSAPI == saslMechanism && isCustomKerberosLoginFound()) {
properties.put(SASL_LOGIN_CLASS.getProperty(), SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java
new file mode 100644
index 0000000000..fef038a0ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DelegatingLoginConfigProviderTest {
+
+ private static final String PLAIN_LOGIN_MODULE = "PlainLoginModule";
+
+ private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule";
+
+ DelegatingLoginConfigProvider provider;
+
+ TestRunner runner;
+
+ @BeforeEach
+ void setProvider() {
+ provider = new DelegatingLoginConfigProvider();
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.setValidateExpressionUsage(false);
+ }
+
+ @Test
+ void testGetConfigurationPlain() {
+ runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.PLAIN.getValue());
+ final PropertyContext propertyContext = runner.getProcessContext();
+
+ final String configuration = provider.getConfiguration(propertyContext);
+
+ assertNotNull(configuration);
+ assertTrue(configuration.contains(PLAIN_LOGIN_MODULE), "PLAIN configuration not found");
+ }
+
+ @Test
+ void testGetConfigurationScram() {
+ runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_512.getValue());
+ final PropertyContext propertyContext = runner.getProcessContext();
+
+ final String configuration = provider.getConfiguration(propertyContext);
+
+ assertNotNull(configuration);
+ assertTrue(configuration.contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found");
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
new file mode 100644
index 0000000000..bce3412186
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.property.provider;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class StandardKafkaPropertyProviderTest {
+
+ private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule";
+
+ StandardKafkaPropertyProvider provider;
+
+ TestRunner runner;
+
+ @BeforeEach
+ void setProvider() {
+ provider = new StandardKafkaPropertyProvider(String.class);
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.setValidateExpressionUsage(false);
+ }
+
+ @Test
+ void testGetProperties() {
+ final SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+
+ runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name());
+ final PropertyContext propertyContext = runner.getProcessContext();
+
+ final Map<String, Object> properties = provider.getProperties(propertyContext);
+
+ assertEquals(securityProtocol.name(), properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName()));
+ }
+
+ @Test
+ void testGetPropertiesSaslMechanismScram() {
+ final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+
+ runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name());
+ runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_256.getValue());
+ final PropertyContext propertyContext = runner.getProcessContext();
+
+ final Map<String, Object> properties = provider.getProperties(propertyContext);
+
+ final Object securityProtocolProperty = properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName());
+ assertEquals(securityProtocol.name(), securityProtocolProperty);
+
+ final Object saslConfigProperty = properties.get(KafkaClientProperty.SASL_JAAS_CONFIG.getProperty());
+ assertNotNull(saslConfigProperty, "SASL configuration not found");
+ assertTrue(saslConfigProperty.toString().contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found");
+ }
+}