You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/11/21 14:36:08 UTC

[bahir-flink] branch master updated: Move SSL config to FlinkJedisConfigBase

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ca1eb01  Move SSL config to FlinkJedisConfigBase
ca1eb01 is described below

commit ca1eb013163b8cfa94a817af072cb2cee9ebf040
Author: Sebastian Ramirez <sr...@brex.com>
AuthorDate: Thu Oct 20 16:49:51 2022 -0700

    Move SSL config to FlinkJedisConfigBase
    
    This allows us to support SSL connections in non-cluster configurations.
    
    JedisSentinelPool currently doesn't support SSL connections.
---
 .../common/config/FlinkJedisClusterConfig.java     | 30 +++++++---------------
 .../redis/common/config/FlinkJedisConfigBase.java  | 13 +++++++++-
 .../redis/common/config/FlinkJedisPoolConfig.java  | 22 +++++++++++++---
 .../common/config/FlinkJedisSentinelConfig.java    |  2 +-
 .../container/RedisCommandsContainerBuilder.java   | 13 +++++++---
 .../common/config/FlinkJedisConfigBaseTest.java    |  2 +-
 .../common/config/JedisClusterConfigTest.java      |  6 ++---
 7 files changed, 54 insertions(+), 34 deletions(-)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index cc7762a..2995572 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -35,8 +35,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
 
     private final Set<InetSocketAddress> nodes;
     private final int maxRedirections;
-    private final boolean ssl;
-
 
     /**
      * Jedis cluster configuration.
@@ -49,22 +47,21 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the pool
      * @param password the password of redis cluster
-     * @param ssl Whether SSL connection should be established, default value is false
+     * @param useSsl Whether SSL connection should be established, default value is false
      * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
      * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
      * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle, String password, boolean ssl,
+                                    int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl,
                                     boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(nodes, "Node information should be presented");
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
         this.nodes = new HashSet<>(nodes);
         this.maxRedirections = maxRedirections;
-        this.ssl = ssl;
     }
 
 
@@ -91,15 +88,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         return maxRedirections;
     }
 
-    /**
-     * Returns ssl.
-     *
-     * @return ssl
-     */
-    public boolean getSsl() {
-        return ssl;
-    }
-
     /**
      * Builder for initializing  {@link FlinkJedisClusterConfig}.
      */
@@ -114,7 +102,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
         private String password;
-        private boolean ssl = false;
+        private boolean useSsl = false;
 
         /**
          * Sets list of node.
@@ -200,11 +188,11 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         /**
          * Sets value for the {@code ssl} configuration attribute.
          *
-         * @param ssl flag if an SSL connection should be established
+         * @param useSsl flag if an SSL connection should be established
          * @return Builder itself
          */
-        public Builder setSsl(boolean ssl){
-            this.ssl = ssl;
+        public Builder setUseSsl(boolean useSsl){
+            this.useSsl = useSsl;
             return this;
         }
 
@@ -253,7 +241,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, testOnReturn, testWhileIdle);
+            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
@@ -267,7 +255,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
           ", minIdle=" + minIdle +
           ", connectionTimeout=" + connectionTimeout +
           ", password=" + password +
-          ", ssl=" + ssl +
+          ", useSsl=" + useSsl +
           ", testOnBorrow=" + testOnBorrow +
           ", testOnReturn=" + testOnReturn +
           ", testWhileIdle=" + testWhileIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index a41b0e0..4e68091 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -32,12 +32,13 @@ public abstract class FlinkJedisConfigBase implements Serializable {
     protected final int minIdle;
     protected final int connectionTimeout;
     protected final String password;
+    protected final boolean useSsl;
 
     protected final boolean testOnBorrow;
     protected final boolean testOnReturn;
     protected final boolean testWhileIdle;
 
-    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
 
         Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
         Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
@@ -52,6 +53,7 @@ public abstract class FlinkJedisConfigBase implements Serializable {
         this.testOnReturn = testOnReturn;
         this.testWhileIdle = testWhileIdle;
         this.password = password;
+        this.useSsl = useSsl;
     }
 
     /**
@@ -108,6 +110,15 @@ public abstract class FlinkJedisConfigBase implements Serializable {
         return password;
     }
 
+
+    /**
+     * Whether connection to Redis should use SSL
+     * @return true if connection to Redis uses SSL, false otherwise
+     */
+    public boolean getUseSsl() {
+        return useSsl;
+    }
+
     /**
      * Get the value for the {@code testOnBorrow} configuration attribute
      * for pools to be created with this configuration instance.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 5012da1..86c717b 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -32,7 +32,6 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
     private final int port;
     private final int database;
 
-
     /**
      * Jedis pool configuration.
      * The host is mandatory, and when host is not set, it throws NullPointerException.
@@ -41,6 +40,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
      * @param port port, default value is 6379
      * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
      * @param password password, if any
+     * @param useSsl Whether SSL connection should be established, default value is false
      * @param database database index
      * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
      * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
@@ -50,10 +50,10 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
      * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code host} is {@code null}
      */
-    private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
+    private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, boolean useSsl, int database,
                                  int maxTotal, int maxIdle, int minIdle,
                                  boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(host, "Host information should be presented");
         this.host = host;
@@ -104,6 +104,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
         private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
         private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
+        private boolean useSsl = false;
 
         /**
          * Sets value for the {@code maxTotal} configuration attribute
@@ -235,13 +236,25 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
             return this;
         }
 
+
+        /**
+         * Sets value for the {@code ssl} configuration attribute.
+         *
+         * @param useSsl flag if an SSL connection should be established
+         * @return Builder itself
+         */
+        public Builder setUseSsl(boolean useSsl) {
+            this.useSsl = useSsl;
+            return this;
+        }
+
         /**
          * Builds JedisPoolConfig.
          *
          * @return JedisPoolConfig
          */
         public FlinkJedisPoolConfig build() {
-            return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
+            return new FlinkJedisPoolConfig(host, port, timeout, password, useSsl, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
@@ -250,6 +263,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
         return "FlinkJedisPoolConfig{" +
           "host=" + host +
           ", port=" + port +
+          ", useSsl=" + useSsl +
           ", database=" + database +
           ", maxTotal=" + maxTotal +
           ", maxIdle=" + maxIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 340eb4e..a6a29ff 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -63,7 +63,7 @@ public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
                                      String password, int database,
                                      int maxTotal, int maxIdle, int minIdle,
                                      boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, false, testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(masterName, "Master name should be presented");
         Objects.requireNonNull(sentinels, "Sentinels information should be presented");
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 7f5af3d..6dd43a9 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -69,8 +69,11 @@ public class RedisCommandsContainerBuilder {
         GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
 
         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
-          jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
-          jedisPoolConfig.getDatabase());
+          jedisPoolConfig.getPort(),
+          jedisPoolConfig.getConnectionTimeout(),
+          jedisPoolConfig.getPassword(),
+          jedisPoolConfig.getDatabase(),
+          jedisPoolConfig.getUseSsl());
         return new RedisContainer(jedisPool);
     }
 
@@ -93,7 +96,7 @@ public class RedisCommandsContainerBuilder {
           jedisClusterConfig.getPassword(),
           DEFAULT_CLIENT_NAME,
           genericObjectPoolConfig,
-          jedisClusterConfig.getSsl());
+          jedisClusterConfig.getUseSsl());
         return new RedisClusterContainer(jedisCluster);
     }
 
@@ -109,6 +112,10 @@ public class RedisCommandsContainerBuilder {
 
         GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
 
+        if (jedisSentinelConfig.getUseSsl()) {
+            throw new RuntimeException("JedisSentinelPool does not support SSL connections yet.");
+        }
+
         JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
           jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
           jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 80189df..54984d5 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
     private class TestConfig extends FlinkJedisConfigBase {
         protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle,
                              boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", testOnBorrow, testOnReturn, testWhileIdle);
+            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", false, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 76208b9..7cefc1f 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -94,9 +94,9 @@ public class JedisClusterConfigTest extends TestLogger {
                 .setMaxTotal(0)
                 .setTimeout(0)
                 .setNodes(set)
-                .setSsl(true)
+                .setUseSsl(true)
                 .build();
-        assertTrue(clusterConfig.getSsl());
+        assertTrue(clusterConfig.getUseSsl());
     }
 
     @Test
@@ -111,7 +111,7 @@ public class JedisClusterConfigTest extends TestLogger {
                 .setTimeout(0)
                 .setNodes(set)
                 .build();
-        assertFalse(clusterConfig.getSsl());
+        assertFalse(clusterConfig.getUseSsl());
     }
 
 }