You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/12/01 16:30:07 UTC

[nifi] branch main updated: NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components

This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c3b0e1a790 NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components
c3b0e1a790 is described below

commit c3b0e1a790832cdbb9bc5bec336dc27859313c0e
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Dec 1 08:15:20 2022 -0600

    NIFI-10919 Corrected SCRAM SASL Mechanism for Kafka Components
    
    This closes #6743
    Signed-off-by: Paul Grey <gr...@apache.org>
---
 .../login/DelegatingLoginConfigProvider.java       |  2 +-
 .../nifi/kafka/shared/property/SaslMechanism.java  | 10 +++
 .../StandardKafkaPropertyNameProvider.java         | 16 +++--
 .../provider/StandardKafkaPropertyProvider.java    |  2 +-
 .../login/DelegatingLoginConfigProviderTest.java   | 69 +++++++++++++++++++
 .../StandardKafkaPropertyProviderTest.java         | 80 ++++++++++++++++++++++
 6 files changed, 171 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
index c9b81bc595..2be8274606 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
@@ -47,7 +47,7 @@ public class DelegatingLoginConfigProvider implements LoginConfigProvider {
     @Override
     public String getConfiguration(final PropertyContext context) {
         final String saslMechanismProperty = context.getProperty(KafkaClientComponent.SASL_MECHANISM).getValue();
-        final SaslMechanism saslMechanism = SaslMechanism.valueOf(saslMechanismProperty);
+        final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(saslMechanismProperty);
         final LoginConfigProvider loginConfigProvider = PROVIDERS.getOrDefault(saslMechanism, SCRAM_PROVIDER);
         return loginConfigProvider.getConfiguration(context);
     }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
index 619daeb86a..a9da7714e3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
@@ -18,6 +18,9 @@ package org.apache.nifi.kafka.shared.property;
 
 import org.apache.nifi.components.DescribedValue;
 
+import java.util.Arrays;
+import java.util.Optional;
+
 /**
  * Enumeration of supported Kafka SASL Mechanisms
  */
@@ -42,6 +45,13 @@ public enum SaslMechanism implements DescribedValue {
         this.description = description;
     }
 
+    public static SaslMechanism getSaslMechanism(final String value) {
+        final Optional<SaslMechanism> foundSaslMechanism = Arrays.stream(SaslMechanism.values())
+                .filter(saslMechanism -> saslMechanism.getValue().equals(value))
+                .findFirst();
+        return foundSaslMechanism.orElseThrow(() -> new IllegalArgumentException(String.format("SaslMechanism value [%s] not found", value)));
+    }
+
     @Override
     public String getValue() {
         return value;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java
index 10979716a4..2083bfc7e7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyNameProvider.java
@@ -19,6 +19,7 @@ package org.apache.nifi.kafka.shared.property.provider;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.LinkedHashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -58,9 +59,12 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi
         final Set<String> propertyNames = new LinkedHashSet<>();
 
         for (final String propertyClassName : PROPERTY_CLASSES) {
-            final Class<?> propertyClass = getClass(propertyClassName);
-            final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass);
-            propertyNames.addAll(classPropertyNames);
+            final Optional<Class<?>> propertyClassFound = findClass(propertyClassName);
+            if (propertyClassFound.isPresent()) {
+                final Class<?> propertyClass = propertyClassFound.get();
+                final Set<String> classPropertyNames = getStaticStringPropertyNames(propertyClass);
+                propertyNames.addAll(classPropertyNames);
+            }
         }
 
         return propertyNames;
@@ -93,11 +97,11 @@ public class StandardKafkaPropertyNameProvider implements KafkaPropertyNameProvi
         }
     }
 
-    private static Class<?> getClass(final String className) {
+    private static Optional<Class<?>> findClass(final String className) {
         try {
-            return Class.forName(className);
+            return Optional.of(Class.forName(className));
         } catch (final ClassNotFoundException e) {
-            throw new IllegalStateException("Kafka Configuration Class not found", e);
+            return Optional.empty();
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
index a9ae4d6320..fd06acb6d0 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
@@ -83,7 +83,7 @@ public class StandardKafkaPropertyProvider implements KafkaPropertyProvider {
             final String loginConfig = LOGIN_CONFIG_PROVIDER.getConfiguration(context);
             properties.put(SASL_JAAS_CONFIG.getProperty(), loginConfig);
 
-            final SaslMechanism saslMechanism = SaslMechanism.valueOf(context.getProperty(SASL_MECHANISM).getValue());
+            final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(context.getProperty(SASL_MECHANISM).getValue());
             if (SaslMechanism.GSSAPI == saslMechanism && isCustomKerberosLoginFound()) {
                 properties.put(SASL_LOGIN_CLASS.getProperty(), SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
             }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java
new file mode 100644
index 0000000000..fef038a0ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProviderTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DelegatingLoginConfigProviderTest {
+
+    private static final String PLAIN_LOGIN_MODULE = "PlainLoginModule";
+
+    private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule";
+
+    DelegatingLoginConfigProvider provider;
+
+    TestRunner runner;
+
+    @BeforeEach
+    void setProvider() {
+        provider = new DelegatingLoginConfigProvider();
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.setValidateExpressionUsage(false);
+    }
+
+    @Test
+    void testGetConfigurationPlain() {
+        runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.PLAIN.getValue());
+        final PropertyContext propertyContext = runner.getProcessContext();
+
+        final String configuration = provider.getConfiguration(propertyContext);
+
+        assertNotNull(configuration);
+        assertTrue(configuration.contains(PLAIN_LOGIN_MODULE), "PLAIN configuration not found");
+    }
+
+    @Test
+    void testGetConfigurationScram() {
+        runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_512.getValue());
+        final PropertyContext propertyContext = runner.getProcessContext();
+
+        final String configuration = provider.getConfiguration(propertyContext);
+
+        assertNotNull(configuration);
+        assertTrue(configuration.contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found");
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
new file mode 100644
index 0000000000..bce3412186
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProviderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.property.provider;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class StandardKafkaPropertyProviderTest {
+
+    private static final String SCRAM_LOGIN_MODULE = "ScramLoginModule";
+
+    StandardKafkaPropertyProvider provider;
+
+    TestRunner runner;
+
+    @BeforeEach
+    void setProvider() {
+        provider = new StandardKafkaPropertyProvider(String.class);
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.setValidateExpressionUsage(false);
+    }
+
+    @Test
+    void testGetProperties() {
+        final SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+
+        runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name());
+        final PropertyContext propertyContext = runner.getProcessContext();
+
+        final Map<String, Object> properties = provider.getProperties(propertyContext);
+
+        assertEquals(securityProtocol.name(), properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName()));
+    }
+
+    @Test
+    void testGetPropertiesSaslMechanismScram() {
+        final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+
+        runner.setProperty(KafkaClientComponent.SECURITY_PROTOCOL, securityProtocol.name());
+        runner.setProperty(KafkaClientComponent.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_256.getValue());
+        final PropertyContext propertyContext = runner.getProcessContext();
+
+        final Map<String, Object> properties = provider.getProperties(propertyContext);
+
+        final Object securityProtocolProperty = properties.get(KafkaClientComponent.SECURITY_PROTOCOL.getName());
+        assertEquals(securityProtocol.name(), securityProtocolProperty);
+
+        final Object saslConfigProperty = properties.get(KafkaClientProperty.SASL_JAAS_CONFIG.getProperty());
+        assertNotNull(saslConfigProperty, "SASL configuration not found");
+        assertTrue(saslConfigProperty.toString().contains(SCRAM_LOGIN_MODULE), "SCRAM configuration not found");
+    }
+}