You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/11/21 14:40:49 UTC

[bahir-flink] branch master updated (ca1eb01 -> 510a2b1)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


 discard ca1eb01  Move SSL config to FlinkJedisConfigBase
     new 510a2b1  [BAHIR-315] Move SSL config to FlinkJedisConfigBase

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ca1eb01)
            \
             N -- N -- N   refs/heads/master (510a2b1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[bahir-flink] 01/01: [BAHIR-315] Move SSL config to FlinkJedisConfigBase

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit 510a2b18ea8ad42932a37604b57ad5d47fb62a03
Author: Sebastian Ramirez <sr...@brex.com>
AuthorDate: Thu Oct 20 16:49:51 2022 -0700

    [BAHIR-315] Move SSL config to FlinkJedisConfigBase
    
    This allows us to support SSL connections in non-cluster configurations.
    
    JedisSentinelPool currently doesn't support SSL connections.
---
 .../common/config/FlinkJedisClusterConfig.java     | 30 +++++++---------------
 .../redis/common/config/FlinkJedisConfigBase.java  | 13 +++++++++-
 .../redis/common/config/FlinkJedisPoolConfig.java  | 22 +++++++++++++---
 .../common/config/FlinkJedisSentinelConfig.java    |  2 +-
 .../container/RedisCommandsContainerBuilder.java   | 13 +++++++---
 .../common/config/FlinkJedisConfigBaseTest.java    |  2 +-
 .../common/config/JedisClusterConfigTest.java      |  6 ++---
 7 files changed, 54 insertions(+), 34 deletions(-)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index cc7762a..2995572 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -35,8 +35,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
 
     private final Set<InetSocketAddress> nodes;
     private final int maxRedirections;
-    private final boolean ssl;
-
 
     /**
      * Jedis cluster configuration.
@@ -49,22 +47,21 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the pool
      * @param password the password of redis cluster
-     * @param ssl Whether SSL connection should be established, default value is false
+     * @param useSsl Whether SSL connection should be established, default value is false
      * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
      * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
      * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle, String password, boolean ssl,
+                                    int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl,
                                     boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(nodes, "Node information should be presented");
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
         this.nodes = new HashSet<>(nodes);
         this.maxRedirections = maxRedirections;
-        this.ssl = ssl;
     }
 
 
@@ -91,15 +88,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         return maxRedirections;
     }
 
-    /**
-     * Returns ssl.
-     *
-     * @return ssl
-     */
-    public boolean getSsl() {
-        return ssl;
-    }
-
     /**
      * Builder for initializing  {@link FlinkJedisClusterConfig}.
      */
@@ -114,7 +102,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
         private String password;
-        private boolean ssl = false;
+        private boolean useSsl = false;
 
         /**
          * Sets list of node.
@@ -200,11 +188,11 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         /**
          * Sets value for the {@code ssl} configuration attribute.
          *
-         * @param ssl flag if an SSL connection should be established
+         * @param useSsl flag if an SSL connection should be established
          * @return Builder itself
          */
-        public Builder setSsl(boolean ssl){
-            this.ssl = ssl;
+        public Builder setUseSsl(boolean useSsl){
+            this.useSsl = useSsl;
             return this;
         }
 
@@ -253,7 +241,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, testOnReturn, testWhileIdle);
+            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
@@ -267,7 +255,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
           ", minIdle=" + minIdle +
           ", connectionTimeout=" + connectionTimeout +
           ", password=" + password +
-          ", ssl=" + ssl +
+          ", useSsl=" + useSsl +
           ", testOnBorrow=" + testOnBorrow +
           ", testOnReturn=" + testOnReturn +
           ", testWhileIdle=" + testWhileIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index a41b0e0..4e68091 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -32,12 +32,13 @@ public abstract class FlinkJedisConfigBase implements Serializable {
     protected final int minIdle;
     protected final int connectionTimeout;
     protected final String password;
+    protected final boolean useSsl;
 
     protected final boolean testOnBorrow;
     protected final boolean testOnReturn;
     protected final boolean testWhileIdle;
 
-    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
 
         Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
         Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
@@ -52,6 +53,7 @@ public abstract class FlinkJedisConfigBase implements Serializable {
         this.testOnReturn = testOnReturn;
         this.testWhileIdle = testWhileIdle;
         this.password = password;
+        this.useSsl = useSsl;
     }
 
     /**
@@ -108,6 +110,15 @@ public abstract class FlinkJedisConfigBase implements Serializable {
         return password;
     }
 
+
+    /**
+     * Whether connection to Redis should use SSL
+     * @return true if connection to Redis uses SSL, false otherwise
+     */
+    public boolean getUseSsl() {
+        return useSsl;
+    }
+
     /**
      * Get the value for the {@code testOnBorrow} configuration attribute
      * for pools to be created with this configuration instance.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 5012da1..86c717b 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -32,7 +32,6 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
     private final int port;
     private final int database;
 
-
     /**
      * Jedis pool configuration.
      * The host is mandatory, and when host is not set, it throws NullPointerException.
@@ -41,6 +40,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
      * @param port port, default value is 6379
      * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
      * @param password password, if any
+     * @param useSsl Whether SSL connection should be established, default value is false
      * @param database database index
      * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
      * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
@@ -50,10 +50,10 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
      * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code host} is {@code null}
      */
-    private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
+    private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, boolean useSsl, int database,
                                  int maxTotal, int maxIdle, int minIdle,
                                  boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(host, "Host information should be presented");
         this.host = host;
@@ -104,6 +104,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
         private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
         private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
+        private boolean useSsl = false;
 
         /**
          * Sets value for the {@code maxTotal} configuration attribute
@@ -235,13 +236,25 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
             return this;
         }
 
+
+        /**
+         * Sets value for the {@code ssl} configuration attribute.
+         *
+         * @param useSsl flag if an SSL connection should be established
+         * @return Builder itself
+         */
+        public Builder setUseSsl(boolean useSsl) {
+            this.useSsl = useSsl;
+            return this;
+        }
+
         /**
          * Builds JedisPoolConfig.
          *
          * @return JedisPoolConfig
          */
         public FlinkJedisPoolConfig build() {
-            return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
+            return new FlinkJedisPoolConfig(host, port, timeout, password, useSsl, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
@@ -250,6 +263,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
         return "FlinkJedisPoolConfig{" +
           "host=" + host +
           ", port=" + port +
+          ", useSsl=" + useSsl +
           ", database=" + database +
           ", maxTotal=" + maxTotal +
           ", maxIdle=" + maxIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 340eb4e..a6a29ff 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -63,7 +63,7 @@ public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
                                      String password, int database,
                                      int maxTotal, int maxIdle, int minIdle,
                                      boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, false, testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(masterName, "Master name should be presented");
         Objects.requireNonNull(sentinels, "Sentinels information should be presented");
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 7f5af3d..6dd43a9 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -69,8 +69,11 @@ public class RedisCommandsContainerBuilder {
         GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
 
         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
-          jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
-          jedisPoolConfig.getDatabase());
+          jedisPoolConfig.getPort(),
+          jedisPoolConfig.getConnectionTimeout(),
+          jedisPoolConfig.getPassword(),
+          jedisPoolConfig.getDatabase(),
+          jedisPoolConfig.getUseSsl());
         return new RedisContainer(jedisPool);
     }
 
@@ -93,7 +96,7 @@ public class RedisCommandsContainerBuilder {
           jedisClusterConfig.getPassword(),
           DEFAULT_CLIENT_NAME,
           genericObjectPoolConfig,
-          jedisClusterConfig.getSsl());
+          jedisClusterConfig.getUseSsl());
         return new RedisClusterContainer(jedisCluster);
     }
 
@@ -109,6 +112,10 @@ public class RedisCommandsContainerBuilder {
 
         GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
 
+        if (jedisSentinelConfig.getUseSsl()) {
+            throw new RuntimeException("JedisSentinelPool does not support SSL connections yet.");
+        }
+
         JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
           jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
           jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 80189df..54984d5 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
     private class TestConfig extends FlinkJedisConfigBase {
         protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle,
                              boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
-            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", testOnBorrow, testOnReturn, testWhileIdle);
+            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", false, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 76208b9..7cefc1f 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -94,9 +94,9 @@ public class JedisClusterConfigTest extends TestLogger {
                 .setMaxTotal(0)
                 .setTimeout(0)
                 .setNodes(set)
-                .setSsl(true)
+                .setUseSsl(true)
                 .build();
-        assertTrue(clusterConfig.getSsl());
+        assertTrue(clusterConfig.getUseSsl());
     }
 
     @Test
@@ -111,7 +111,7 @@ public class JedisClusterConfigTest extends TestLogger {
                 .setTimeout(0)
                 .setNodes(set)
                 .build();
-        assertFalse(clusterConfig.getSsl());
+        assertFalse(clusterConfig.getUseSsl());
     }
 
 }