You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/10/14 15:21:18 UTC

[bahir-flink] branch master updated (efd9520 -> 522624f)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


    from efd9520  [BAHIR-312] Add license header to README.md files
     new ef3d18b  [BAHIR-315] update jedis dependency to 4.3.0
     new 522624f  [BAHIR-315] add support for SSL connection

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-connector-redis/pom.xml                      |  2 +-
 .../common/config/FlinkJedisClusterConfig.java     | 28 ++++++++++++++++--
 .../common/container/RedisClusterContainer.java    |  9 +-----
 .../container/RedisCommandsContainerBuilder.java   | 17 +++++++----
 .../connectors/redis/RedisSinkITCase.java          | 24 ++++++++--------
 .../streaming/connectors/redis/RedisSinkTest.java  |  4 +--
 .../common/config/JedisClusterConfigTest.java      | 33 ++++++++++++++++++++++
 .../RedisCommandsContainerBuilderTest.java         |  7 +++--
 8 files changed, 90 insertions(+), 34 deletions(-)


[bahir-flink] 02/02: [BAHIR-315] add support for SSL connection

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit 522624f481eaf8fdaced084bf166b0e84c664b68
Author: netsi <ne...@users.noreply.github.com>
AuthorDate: Tue Oct 11 17:46:07 2022 +0200

    [BAHIR-315] add support for SSL connection
---
 .../common/config/FlinkJedisClusterConfig.java     | 28 ++++++++++++++++--
 .../container/RedisCommandsContainerBuilder.java   |  3 +-
 .../common/config/JedisClusterConfigTest.java      | 33 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index 0840deb..cc7762a 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -35,6 +35,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
 
     private final Set<InetSocketAddress> nodes;
     private final int maxRedirections;
+    private final boolean ssl;
 
 
     /**
@@ -48,13 +49,14 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the pool
      * @param password the password of redis cluster
+     * @param ssl Whether SSL connection should be established, default value is false
      * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
      * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
      * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle, String password,
+                                    int maxTotal, int maxIdle, int minIdle, String password, boolean ssl,
                                     boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
         super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
 
@@ -62,6 +64,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
         this.nodes = new HashSet<>(nodes);
         this.maxRedirections = maxRedirections;
+        this.ssl = ssl;
     }
 
 
@@ -88,6 +91,14 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         return maxRedirections;
     }
 
+    /**
+     * Returns ssl.
+     *
+     * @return ssl
+     */
+    public boolean getSsl() {
+        return ssl;
+    }
 
     /**
      * Builder for initializing  {@link FlinkJedisClusterConfig}.
@@ -103,6 +114,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
         private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
         private String password;
+        private boolean ssl = false;
 
         /**
          * Sets list of node.
@@ -185,6 +197,17 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
             return this;
         }
 
+        /**
+         * Sets value for the {@code ssl} configuration attribute.
+         *
+         * @param ssl flag if an SSL connection should be established
+         * @return Builder itself
+         */
+        public Builder setSsl(boolean ssl){
+            this.ssl = ssl;
+            return this;
+        }
+
         /**
          * Sets value for the {@code testOnBorrow} configuration attribute
          * for pools to be created with this configuration instance.
@@ -230,7 +253,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 
@@ -244,6 +267,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
           ", minIdle=" + minIdle +
           ", connectionTimeout=" + connectionTimeout +
           ", password=" + password +
+          ", ssl=" + ssl +
           ", testOnBorrow=" + testOnBorrow +
           ", testOnReturn=" + testOnReturn +
           ", testWhileIdle=" + testWhileIdle +
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 0ffbbe0..7f5af3d 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -92,7 +92,8 @@ public class RedisCommandsContainerBuilder {
           jedisClusterConfig.getMaxRedirections(),
           jedisClusterConfig.getPassword(),
           DEFAULT_CLIENT_NAME,
-          genericObjectPoolConfig);
+          genericObjectPoolConfig,
+          jedisClusterConfig.getSsl());
         return new RedisClusterContainer(jedisCluster);
     }
 
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index d64be84..76208b9 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -25,6 +25,8 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 public class JedisClusterConfigTest extends TestLogger {
 
@@ -81,4 +83,35 @@ public class JedisClusterConfigTest extends TestLogger {
         assertNull(clusterConfig.getPassword());
     }
 
+    @Test
+    public void shouldSetSslSuccessfully() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .setSsl(true)
+                .build();
+        assertTrue(clusterConfig.getSsl());
+    }
+
+    @Test
+    public void shouldSslNotBeenSet() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .build();
+        assertFalse(clusterConfig.getSsl());
+    }
+
 }


[bahir-flink] 01/02: [BAHIR-315] update jedis dependency to 4.3.0

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit ef3d18b07ca5ab2d4b653171777122ae257e747c
Author: netsi <ne...@users.noreply.github.com>
AuthorDate: Tue Oct 11 17:43:55 2022 +0200

    [BAHIR-315] update jedis dependency to 4.3.0
---
 flink-connector-redis/pom.xml                      |  2 +-
 .../common/container/RedisClusterContainer.java    |  9 +-------
 .../container/RedisCommandsContainerBuilder.java   | 14 ++++++++-----
 .../connectors/redis/RedisSinkITCase.java          | 24 +++++++++++-----------
 .../streaming/connectors/redis/RedisSinkTest.java  |  4 ++--
 .../RedisCommandsContainerBuilderTest.java         |  7 ++++---
 6 files changed, 29 insertions(+), 31 deletions(-)

diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index d7d67de..71c5eff 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <name>flink-connector-redis</name>
 
     <properties>
-        <jedis.version>2.9.0</jedis.version>
+        <jedis.version>4.3.0</jedis.version>
     </properties>
 
     <dependencyManagement>
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index d61716b..d2aa13e 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -47,14 +47,7 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
     }
 
     @Override
-    public void open() throws Exception {
-
-        // echo() tries to open a connection and echos back the
-        // message passed as argument. Here we use it to monitor
-        // if we can communicate with the cluster.
-
-        jedisCluster.echo("Test");
-    }
+    public void open() throws Exception {}
 
     @Override
     public void hset(final String key, final String hashField, final String value, final Integer ttl) {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index b06a6e9..0ffbbe0 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -25,6 +25,8 @@ import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.JedisPool;
 import redis.clients.jedis.JedisPoolConfig;
 import redis.clients.jedis.JedisSentinelPool;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Connection;
 
 import java.util.Objects;
 
@@ -32,6 +34,7 @@ import java.util.Objects;
  * The builder for {@link RedisCommandsContainer}.
  */
 public class RedisCommandsContainerBuilder {
+    private static final String DEFAULT_CLIENT_NAME = "default_client";
 
     /**
      * Initialize the {@link RedisCommandsContainer} based on the instance type.
@@ -63,7 +66,7 @@ public class RedisCommandsContainerBuilder {
     public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
         Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
 
-        GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
 
         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
           jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
@@ -81,13 +84,14 @@ public class RedisCommandsContainerBuilder {
     public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
         Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
 
-        GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
+        GenericObjectPoolConfig<Connection> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
 
         JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
           jedisClusterConfig.getConnectionTimeout(),
           jedisClusterConfig.getConnectionTimeout(),
           jedisClusterConfig.getMaxRedirections(),
           jedisClusterConfig.getPassword(),
+          DEFAULT_CLIENT_NAME,
           genericObjectPoolConfig);
         return new RedisClusterContainer(jedisCluster);
     }
@@ -102,7 +106,7 @@ public class RedisCommandsContainerBuilder {
     public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
         Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
 
-        GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
 
         JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
           jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
@@ -111,8 +115,8 @@ public class RedisCommandsContainerBuilder {
         return new RedisContainer(jedisSentinelPool);
     }
 
-    public static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
-        GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new GenericObjectPoolConfig();
+    public static <T> GenericObjectPoolConfig<T> getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+        GenericObjectPoolConfig<T> genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? (GenericObjectPoolConfig<T>) new JedisPoolConfig() : new GenericObjectPoolConfig<T>();
         genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
         genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
         genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index ee1cc7f..5858eb1 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -69,7 +69,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis List Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.llen(REDIS_KEY));
 
         jedis.del(REDIS_KEY);
     }
@@ -83,7 +83,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Set Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.scard(REDIS_KEY));
 
         jedis.del(REDIS_KEY);
     }
@@ -97,8 +97,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Set Data Type With TTL");
 
-        assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY));
-        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY));
+        assertEquals(TEST_MESSAGE_LENGTH.longValue(), jedis.strlen(REDIS_KEY));
+        assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_KEY));
 
         jedis.del(REDIS_KEY);
     }
@@ -126,7 +126,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisZaddSink);
         env.execute("Test ZADD");
 
-        assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY));
 
         RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig,
                 new RedisAdditionalDataMapper(RedisCommand.ZREM));
@@ -134,7 +134,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisZremSink);
         env.execute("Test ZREM");
 
-        assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY));
+        assertEquals(ZERO.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -148,8 +148,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Hash Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-        assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG, jedis.ttl(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -163,8 +163,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Hash Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -178,8 +178,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Hash Data Type 2");
 
-        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
index 0ec4cd5..338e14e 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDes
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
-import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
 
 import java.net.InetSocketAddress;
 import java.util.HashSet;
@@ -110,7 +110,7 @@ public class RedisSinkTest extends TestLogger {
 
             Throwable t = e;
             int depth = 0;
-            while (!(t instanceof JedisConnectionException)) {
+            while (!(t instanceof JedisException)) {
                 t = t.getCause();
                 if (t == null || depth++ == 20) {
                     throw e;
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
index eac5ca0..2b89ec6 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
@@ -25,13 +25,14 @@ import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolC
 import org.apache.flink.test.util.AbstractTestBase;
 import org.junit.Test;
 import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Jedis;
 
 public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
 
     @Test
     public void testNotTestWhileIdle() {
         FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build();
-        GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
         assertFalse(genericObjectPoolConfig.getTestWhileIdle());
         assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
     }
@@ -39,7 +40,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
     @Test
     public void testTestWhileIdle() {
         FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build();
-        GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
         assertTrue(genericObjectPoolConfig.getTestWhileIdle());
         assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
 
@@ -49,7 +50,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
         assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), jedisPoolConfig.getNumTestsPerEvictionRun());
     }
 
-    private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig genericObjectPoolConfig) {
+    private <T> void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig<T> genericObjectPoolConfig) {
         assertEquals(genericObjectPoolConfig.getMaxIdle(), flinkJedisPoolConfig.getMaxIdle());
         assertEquals(genericObjectPoolConfig.getMinIdle(), flinkJedisPoolConfig.getMinIdle());
         assertEquals(genericObjectPoolConfig.getMaxTotal(), flinkJedisPoolConfig.getMaxTotal());