You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/04/12 16:39:21 UTC

[nifi] branch main updated: NIFI-8410: Enabling TLS in RedisStateProvider

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2298953  NIFI-8410: Enabling TLS in RedisStateProvider
2298953 is described below

commit 2298953f90e8023afc4f032dd1dad99c054de5c3
Author: Joe Gresock <jg...@cloudera.com>
AuthorDate: Sat Apr 10 11:55:33 2021 -0400

    NIFI-8410: Enabling TLS in RedisStateProvider
    
    This closes #4990
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../src/main/resources/conf/state-management.xml   |  6 +-
 .../redis/service/RedisConnectionPoolService.java  | 10 ++-
 .../nifi/redis/state/RedisStateProvider.java       | 32 +++++++-
 .../org/apache/nifi/redis/util/RedisUtils.java     |  7 +-
 .../service/TestRedisConnectionPoolService.java    |  8 +-
 .../nifi/redis/state/TestRedisStateProvider.java   | 95 ++++++++++++++++++++++
 6 files changed, 147 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index dcd7ee6..85fb6c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -95,6 +95,10 @@
 
             Password - The password used to authenticate to the Redis server. See the requirepass property in redis.conf.
 
+            Enable TLS - If true, the Redis connection will be configured to use TLS, using the keystore and truststore settings configured in
+                    nifi.properties.  This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode.
+                    If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure (default false).
+
             Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout).
                         A negative value indicates that there is no limit.
 
@@ -131,4 +135,4 @@
         </cluster-provider>
     -->
 
-</stateManagement>
\ No newline at end of file
+</stateManagement>
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
index 68169f9..83937fb 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
@@ -29,9 +29,11 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.redis.RedisConnectionPool;
 import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.ssl.SSLContextService;
 import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 
+import javax.net.ssl.SSLContext;
 import java.util.Collection;
 import java.util.List;
 
@@ -42,6 +44,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
     private volatile PropertyContext context;
     private volatile RedisType redisType;
     private volatile JedisConnectionFactory connectionFactory;
+    private volatile SSLContext sslContext;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -56,6 +59,10 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
         this.context = context;
+        if (context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) {
+            final SSLContextService sslContextService = context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            this.sslContext = sslContextService.createContext();
+        }
 
         final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
         this.redisType = RedisType.fromDisplayName(redisMode);
@@ -68,6 +75,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
             connectionFactory = null;
             redisType = null;
             context = null;
+            sslContext = null;
         }
     }
 
@@ -81,7 +89,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
         if (connectionFactory == null) {
             synchronized (this) {
                 if (connectionFactory == null) {
-                    connectionFactory = RedisUtils.createConnectionFactory(context, getLogger());
+                    connectionFactory = RedisUtils.createConnectionFactory(context, getLogger(), sslContext);
                 }
             }
         }
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index c23bb81..6d552dc 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -33,6 +33,7 @@ import org.apache.nifi.redis.util.RedisUtils;
 import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 
+import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -57,11 +58,22 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
             .defaultValue("nifi/components/")
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
+    public static final PropertyDescriptor ENABLE_TLS = new PropertyDescriptor.Builder()
+            .name("Enable TLS")
+            .displayName("Enable TLS")
+            .description("If true, the Redis connection will be configured to use TLS, using the keystore and truststore settings configured in " +
+                    "nifi.properties.  This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode. " +
+                    "If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure.")
+            .required(true)
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
 
     static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
     static {
         final List<PropertyDescriptor> props = new ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
         props.add(KEY_PREFIX);
+        props.add(ENABLE_TLS);
         STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
     }
 
@@ -69,6 +81,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
     private String keyPrefix;
     private ComponentLog logger;
     private PropertyContext context;
+    private SSLContext sslContext;
 
     private volatile boolean enabled;
     private volatile JedisConnectionFactory connectionFactory;
@@ -76,8 +89,11 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
     private final RedisStateMapSerDe serDe = new RedisStateMapJsonSerDe();
 
     @Override
-    public final void initialize(final StateProviderInitializationContext context) throws IOException {
+    public final void initialize(final StateProviderInitializationContext context) {
         this.context = context;
+        if (context.getProperty(ENABLE_TLS).asBoolean()) {
+            this.sslContext = context.getSSLContext();
+        }
         this.identifier = context.getIdentifier();
         this.logger = context.getLogger();
 
@@ -98,7 +114,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
         final List<ValidationResult> results = new ArrayList<>(RedisUtils.validate(validationContext));
 
         final RedisType redisType = RedisType.fromDisplayName(validationContext.getProperty(RedisUtils.REDIS_MODE).getValue());
-        if (redisType != null && redisType == RedisType.CLUSTER) {
+        if (redisType == RedisType.CLUSTER) {
             results.add(new ValidationResult.Builder()
                     .subject(RedisUtils.REDIS_MODE.getDisplayName())
                     .valid(false)
@@ -106,6 +122,16 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
                             + " is configured in clustered mode, and this service requires a non-clustered Redis")
                     .build());
         }
+        final boolean enableTls = validationContext.getProperty(ENABLE_TLS).asBoolean();
+        if (enableTls && sslContext == null) {
+            results.add(new ValidationResult.Builder()
+                    .subject(ENABLE_TLS.getDisplayName())
+                    .valid(false)
+                    .explanation(ENABLE_TLS.getDisplayName()
+                            + " is set to 'true', but Apache NiFi is not secured.  This state provider can only use a TLS-enabled connection " +
+                            "if a keystore and truststore are provided in nifi.properties.")
+                    .build());
+        }
 
         return results;
     }
@@ -274,7 +300,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
     // visible for testing
     synchronized RedisConnection getRedis() {
         if (connectionFactory == null) {
-            connectionFactory = RedisUtils.createConnectionFactory(context, logger);
+            connectionFactory = RedisUtils.createConnectionFactory(context, logger, sslContext);
         }
 
         return connectionFactory.getConnection();
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
index 1b5bab8..dd01977 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
@@ -26,7 +26,6 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
-import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
 import org.springframework.data.redis.connection.RedisClusterConfiguration;
 import org.springframework.data.redis.connection.RedisConfiguration;
@@ -273,7 +272,7 @@ public class RedisUtils {
     }
 
 
-    public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger) {
+    public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger, final SSLContext sslContext) {
         final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
         final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
         final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
@@ -288,9 +287,7 @@ public class RedisUtils {
                 .poolConfig(poolConfig)
                 .and();
 
-        if (context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) {
-            final SSLContextService sslContextService = context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-            final SSLContext sslContext = sslContextService.createContext();
+        if (sslContext != null) {
             builder = builder.useSsl()
                     .sslParameters(sslContext.getSupportedSSLParameters())
                     .sslSocketFactory(sslContext.getSocketFactory())
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java
index 7223e65..40d4c42 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java
@@ -20,6 +20,7 @@ import org.apache.nifi.redis.RedisConnectionPool;
 import org.apache.nifi.redis.util.RedisUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
+import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.StandardProcessorTestRunner;
@@ -118,7 +119,12 @@ public class TestRedisConnectionPoolService {
         MockProcessContext processContext = ((StandardProcessorTestRunner) testRunner).getProcessContext();
         MockConfigurationContext configContext = new MockConfigurationContext(processContext.getControllerServices()
                 .get(redisService.getIdentifier()).getProperties(), processContext);
-        JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger());
+        SSLContext providedSslContext = null;
+        if (configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) {
+            final SSLContextService sslContextService = configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            providedSslContext = sslContextService.createContext();
+        }
+        JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger(), providedSslContext);
         return connectionFactory;
     }
 
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
new file mode 100644
index 0000000..45fd899
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.state;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.util.Collection;
+
+public class TestRedisStateProvider {
+
+    private RedisStateProvider redisStateProvider;
+    private StateProviderInitializationContext context;
+    private ValidationContext validationContext;
+
+    @Before
+    public void init() {
+        context = Mockito.mock(StateProviderInitializationContext.class);
+        redisStateProvider = new RedisStateProvider();
+        validationContext = Mockito.mock(ValidationContext.class);
+
+        // Set up mock state provider init context
+        Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new MockPropertyValue("/nifi/components/"));
+
+        // Set up mock validation context
+        Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new MockPropertyValue("localhost:6379"));
+        Mockito.when(validationContext.getProperty(RedisUtils.REDIS_MODE)).thenReturn(new MockPropertyValue("Standalone"));
+        Mockito.when(validationContext.getProperty(RedisUtils.DATABASE)).thenReturn(new MockPropertyValue("0"));
+    }
+
+    private void enableTls(boolean enable) {
+        Mockito.when(validationContext.getProperty(RedisStateProvider.ENABLE_TLS)).thenReturn(new MockPropertyValue(String.valueOf(enable)));
+        Mockito.when(context.getProperty(RedisStateProvider.ENABLE_TLS)).thenReturn(new MockPropertyValue(String.valueOf(enable)));
+
+        if (enable) {
+            SSLContext sslContext = Mockito.mock(SSLContext.class);
+            Mockito.when(context.getSSLContext()).thenReturn(sslContext);
+        }
+    }
+
+    @Test
+    public void customValidate_enabledTlsSuccess() throws IOException {
+        this.enableTls(true);
+
+        redisStateProvider.initialize(context);
+
+        Collection<ValidationResult> results = redisStateProvider.customValidate(validationContext);
+        Assert.assertTrue(results.isEmpty());
+    }
+
+    @Test
+    public void customValidate_disableTlsSuccess() throws IOException {
+        this.enableTls(false);
+
+        redisStateProvider.initialize(context);
+
+        Collection<ValidationResult> results = redisStateProvider.customValidate(validationContext);
+        Assert.assertTrue(results.isEmpty());
+    }
+
+    @Test
+    public void customValidate_enableTlsButNoSslContext() throws IOException {
+        this.enableTls(true);
+
+        Mockito.when(context.getSSLContext()).thenReturn(null);
+
+        redisStateProvider.initialize(context);
+
+        Collection<ValidationResult> results = redisStateProvider.customValidate(validationContext);
+        Assert.assertEquals(1, results.size());
+    }
+}