You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/10/14 15:21:20 UTC
[bahir-flink] 02/02: [BAHIR-315] add support for SSL connection
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit 522624f481eaf8fdaced084bf166b0e84c664b68
Author: netsi <ne...@users.noreply.github.com>
AuthorDate: Tue Oct 11 17:46:07 2022 +0200
[BAHIR-315] add support for SSL connection
---
.../common/config/FlinkJedisClusterConfig.java | 28 ++++++++++++++++--
.../container/RedisCommandsContainerBuilder.java | 3 +-
.../common/config/JedisClusterConfigTest.java | 33 ++++++++++++++++++++++
3 files changed, 61 insertions(+), 3 deletions(-)
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index 0840deb..cc7762a 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -35,6 +35,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
private final Set<InetSocketAddress> nodes;
private final int maxRedirections;
+ private final boolean ssl;
/**
@@ -48,13 +49,14 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the pool
* @param password the password of redis cluster
+ * @param ssl Whether SSL connection should be established, default value is false
* @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
* @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
* @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if parameter {@code nodes} is {@code null}
*/
private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
- int maxTotal, int maxIdle, int minIdle, String password,
+ int maxTotal, int maxIdle, int minIdle, String password, boolean ssl,
boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
@@ -62,6 +64,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
this.nodes = new HashSet<>(nodes);
this.maxRedirections = maxRedirections;
+ this.ssl = ssl;
}
@@ -88,6 +91,14 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
return maxRedirections;
}
+ /**
+ * Returns ssl.
+ *
+ * @return ssl
+ */
+ public boolean getSsl() {
+ return ssl;
+ }
/**
* Builder for initializing {@link FlinkJedisClusterConfig}.
@@ -103,6 +114,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
private String password;
+ private boolean ssl = false;
/**
* Sets list of node.
@@ -185,6 +197,17 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
return this;
}
+ /**
+ * Sets value for the {@code ssl} configuration attribute.
+ *
+ * @param ssl flag if an SSL connection should be established
+ * @return Builder itself
+ */
+ public Builder setSsl(boolean ssl){
+ this.ssl = ssl;
+ return this;
+ }
+
/**
* Sets value for the {@code testOnBorrow} configuration attribute
* for pools to be created with this configuration instance.
@@ -230,7 +253,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
* @return JedisClusterConfig
*/
public FlinkJedisClusterConfig build() {
- return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+ return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@@ -244,6 +267,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
", minIdle=" + minIdle +
", connectionTimeout=" + connectionTimeout +
", password=" + password +
+ ", ssl=" + ssl +
", testOnBorrow=" + testOnBorrow +
", testOnReturn=" + testOnReturn +
", testWhileIdle=" + testWhileIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 0ffbbe0..7f5af3d 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -92,7 +92,8 @@ public class RedisCommandsContainerBuilder {
jedisClusterConfig.getMaxRedirections(),
jedisClusterConfig.getPassword(),
DEFAULT_CLIENT_NAME,
- genericObjectPoolConfig);
+ genericObjectPoolConfig,
+ jedisClusterConfig.getSsl());
return new RedisClusterContainer(jedisCluster);
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index d64be84..76208b9 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -25,6 +25,8 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
public class JedisClusterConfigTest extends TestLogger {
@@ -81,4 +83,35 @@ public class JedisClusterConfigTest extends TestLogger {
assertNull(clusterConfig.getPassword());
}
+ @Test
+ public void shouldSetSslSuccessfully() {
+ Set<InetSocketAddress> set = new HashSet<>();
+ InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+ set.add(address);
+ FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+ FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .setNodes(set)
+ .setSsl(true)
+ .build();
+ assertTrue(clusterConfig.getSsl());
+ }
+
+ @Test
+ public void shouldSslNotBeenSet() {
+ Set<InetSocketAddress> set = new HashSet<>();
+ InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+ set.add(address);
+ FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+ FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .setNodes(set)
+ .build();
+ assertFalse(clusterConfig.getSsl());
+ }
+
}