You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/11/21 14:36:08 UTC
[bahir-flink] branch master updated: Move SSL config to FlinkJedisConfigBase
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new ca1eb01 Move SSL config to FlinkJedisConfigBase
ca1eb01 is described below
commit ca1eb013163b8cfa94a817af072cb2cee9ebf040
Author: Sebastian Ramirez <sr...@brex.com>
AuthorDate: Thu Oct 20 16:49:51 2022 -0700
Move SSL config to FlinkJedisConfigBase
This allows us to support SSL connections in non-cluster configurations.
JedisSentinelPool currently doesn't support SSL connections.
---
.../common/config/FlinkJedisClusterConfig.java | 30 +++++++---------------
.../redis/common/config/FlinkJedisConfigBase.java | 13 +++++++++-
.../redis/common/config/FlinkJedisPoolConfig.java | 22 +++++++++++++---
.../common/config/FlinkJedisSentinelConfig.java | 2 +-
.../container/RedisCommandsContainerBuilder.java | 13 +++++++---
.../common/config/FlinkJedisConfigBaseTest.java | 2 +-
.../common/config/JedisClusterConfigTest.java | 6 ++---
7 files changed, 54 insertions(+), 34 deletions(-)
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index cc7762a..2995572 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -35,8 +35,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
private final Set<InetSocketAddress> nodes;
private final int maxRedirections;
- private final boolean ssl;
-
/**
* Jedis cluster configuration.
@@ -49,22 +47,21 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the pool
* @param password the password of redis cluster
- * @param ssl Whether SSL connection should be established, default value is false
+ * @param useSsl Whether SSL connection should be established, default value is false
* @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
* @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
* @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if parameter {@code nodes} is {@code null}
*/
private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
- int maxTotal, int maxIdle, int minIdle, String password, boolean ssl,
+ int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl,
boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
Objects.requireNonNull(nodes, "Node information should be presented");
Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
this.nodes = new HashSet<>(nodes);
this.maxRedirections = maxRedirections;
- this.ssl = ssl;
}
@@ -91,15 +88,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
return maxRedirections;
}
- /**
- * Returns ssl.
- *
- * @return ssl
- */
- public boolean getSsl() {
- return ssl;
- }
-
/**
* Builder for initializing {@link FlinkJedisClusterConfig}.
*/
@@ -114,7 +102,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
private String password;
- private boolean ssl = false;
+ private boolean useSsl = false;
/**
* Sets list of node.
@@ -200,11 +188,11 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
/**
* Sets value for the {@code ssl} configuration attribute.
*
- * @param ssl flag if an SSL connection should be established
+ * @param useSsl flag if an SSL connection should be established
* @return Builder itself
*/
- public Builder setSsl(boolean ssl){
- this.ssl = ssl;
+ public Builder setUseSsl(boolean useSsl){
+ this.useSsl = useSsl;
return this;
}
@@ -253,7 +241,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
* @return JedisClusterConfig
*/
public FlinkJedisClusterConfig build() {
- return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, testOnReturn, testWhileIdle);
+ return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@@ -267,7 +255,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
", minIdle=" + minIdle +
", connectionTimeout=" + connectionTimeout +
", password=" + password +
- ", ssl=" + ssl +
+ ", useSsl=" + useSsl +
", testOnBorrow=" + testOnBorrow +
", testOnReturn=" + testOnReturn +
", testWhileIdle=" + testWhileIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index a41b0e0..4e68091 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -32,12 +32,13 @@ public abstract class FlinkJedisConfigBase implements Serializable {
protected final int minIdle;
protected final int connectionTimeout;
protected final String password;
+ protected final boolean useSsl;
protected final boolean testOnBorrow;
protected final boolean testOnReturn;
protected final boolean testWhileIdle;
- protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
@@ -52,6 +53,7 @@ public abstract class FlinkJedisConfigBase implements Serializable {
this.testOnReturn = testOnReturn;
this.testWhileIdle = testWhileIdle;
this.password = password;
+ this.useSsl = useSsl;
}
/**
@@ -108,6 +110,15 @@ public abstract class FlinkJedisConfigBase implements Serializable {
return password;
}
+
+ /**
+ * Whether connection to Redis should use SSL
+ * @return true if connection to Redis uses SSL, false otherwise
+ */
+ public boolean getUseSsl() {
+ return useSsl;
+ }
+
/**
* Get the value for the {@code testOnBorrow} configuration attribute
* for pools to be created with this configuration instance.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 5012da1..86c717b 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -32,7 +32,6 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
private final int port;
private final int database;
-
/**
* Jedis pool configuration.
* The host is mandatory, and when host is not set, it throws NullPointerException.
@@ -41,6 +40,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
* @param port port, default value is 6379
* @param connectionTimeout socket / connection timeout, default value is 2000 milli second
* @param password password, if any
+ * @param useSsl Whether SSL connection should be established, default value is false
* @param database database index
* @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
* @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
@@ -50,10 +50,10 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
* @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if parameter {@code host} is {@code null}
*/
- private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
+ private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, boolean useSsl, int database,
int maxTotal, int maxIdle, int minIdle,
boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle);
Objects.requireNonNull(host, "Host information should be presented");
this.host = host;
@@ -104,6 +104,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
+ private boolean useSsl = false;
/**
* Sets value for the {@code maxTotal} configuration attribute
@@ -235,13 +236,25 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
return this;
}
+
+ /**
+ * Sets value for the {@code ssl} configuration attribute.
+ *
+ * @param useSsl flag if an SSL connection should be established
+ * @return Builder itself
+ */
+ public Builder setUseSsl(boolean useSsl) {
+ this.useSsl = useSsl;
+ return this;
+ }
+
/**
* Builds JedisPoolConfig.
*
* @return JedisPoolConfig
*/
public FlinkJedisPoolConfig build() {
- return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
+ return new FlinkJedisPoolConfig(host, port, timeout, password, useSsl, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@@ -250,6 +263,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
return "FlinkJedisPoolConfig{" +
"host=" + host +
", port=" + port +
+ ", useSsl=" + useSsl +
", database=" + database +
", maxTotal=" + maxTotal +
", maxIdle=" + maxIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 340eb4e..a6a29ff 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -63,7 +63,7 @@ public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
String password, int database,
int maxTotal, int maxIdle, int minIdle,
boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, false, testOnBorrow, testOnReturn, testWhileIdle);
Objects.requireNonNull(masterName, "Master name should be presented");
Objects.requireNonNull(sentinels, "Sentinels information should be presented");
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 7f5af3d..6dd43a9 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -69,8 +69,11 @@ public class RedisCommandsContainerBuilder {
GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
- jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
- jedisPoolConfig.getDatabase());
+ jedisPoolConfig.getPort(),
+ jedisPoolConfig.getConnectionTimeout(),
+ jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase(),
+ jedisPoolConfig.getUseSsl());
return new RedisContainer(jedisPool);
}
@@ -93,7 +96,7 @@ public class RedisCommandsContainerBuilder {
jedisClusterConfig.getPassword(),
DEFAULT_CLIENT_NAME,
genericObjectPoolConfig,
- jedisClusterConfig.getSsl());
+ jedisClusterConfig.getUseSsl());
return new RedisClusterContainer(jedisCluster);
}
@@ -109,6 +112,10 @@ public class RedisCommandsContainerBuilder {
GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
+ if (jedisSentinelConfig.getUseSsl()) {
+ throw new RuntimeException("JedisSentinelPool does not support SSL connections yet.");
+ }
+
JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 80189df..54984d5 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
private class TestConfig extends FlinkJedisConfigBase {
protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle,
boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", testOnBorrow, testOnReturn, testWhileIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", false, testOnBorrow, testOnReturn, testWhileIdle);
}
}
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 76208b9..7cefc1f 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -94,9 +94,9 @@ public class JedisClusterConfigTest extends TestLogger {
.setMaxTotal(0)
.setTimeout(0)
.setNodes(set)
- .setSsl(true)
+ .setUseSsl(true)
.build();
- assertTrue(clusterConfig.getSsl());
+ assertTrue(clusterConfig.getUseSsl());
}
@Test
@@ -111,7 +111,7 @@ public class JedisClusterConfigTest extends TestLogger {
.setTimeout(0)
.setNodes(set)
.build();
- assertFalse(clusterConfig.getSsl());
+ assertFalse(clusterConfig.getUseSsl());
}
}