You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/10/14 15:21:19 UTC

[bahir-flink] 01/02: [BAHIR-315] update jedis dependency to 4.3.0

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit ef3d18b07ca5ab2d4b653171777122ae257e747c
Author: netsi <ne...@users.noreply.github.com>
AuthorDate: Tue Oct 11 17:43:55 2022 +0200

    [BAHIR-315] update jedis dependency to 4.3.0
---
 flink-connector-redis/pom.xml                      |  2 +-
 .../common/container/RedisClusterContainer.java    |  9 +-------
 .../container/RedisCommandsContainerBuilder.java   | 14 ++++++++-----
 .../connectors/redis/RedisSinkITCase.java          | 24 +++++++++++-----------
 .../streaming/connectors/redis/RedisSinkTest.java  |  4 ++--
 .../RedisCommandsContainerBuilderTest.java         |  7 ++++---
 6 files changed, 29 insertions(+), 31 deletions(-)

diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index d7d67de..71c5eff 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <name>flink-connector-redis</name>
 
     <properties>
-        <jedis.version>2.9.0</jedis.version>
+        <jedis.version>4.3.0</jedis.version>
     </properties>
 
     <dependencyManagement>
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index d61716b..d2aa13e 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -47,14 +47,7 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
     }
 
     @Override
-    public void open() throws Exception {
-
-        // echo() tries to open a connection and echos back the
-        // message passed as argument. Here we use it to monitor
-        // if we can communicate with the cluster.
-
-        jedisCluster.echo("Test");
-    }
+    public void open() throws Exception {}
 
     @Override
     public void hset(final String key, final String hashField, final String value, final Integer ttl) {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index b06a6e9..0ffbbe0 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -25,6 +25,8 @@ import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.JedisPool;
 import redis.clients.jedis.JedisPoolConfig;
 import redis.clients.jedis.JedisSentinelPool;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Connection;
 
 import java.util.Objects;
 
@@ -32,6 +34,7 @@ import java.util.Objects;
  * The builder for {@link RedisCommandsContainer}.
  */
 public class RedisCommandsContainerBuilder {
+    private static final String DEFAULT_CLIENT_NAME = "default_client";
 
     /**
      * Initialize the {@link RedisCommandsContainer} based on the instance type.
@@ -63,7 +66,7 @@ public class RedisCommandsContainerBuilder {
     public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
         Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
 
-        GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
 
         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
           jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
@@ -81,13 +84,14 @@ public class RedisCommandsContainerBuilder {
     public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
         Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
 
-        GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
+        GenericObjectPoolConfig<Connection> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
 
         JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
           jedisClusterConfig.getConnectionTimeout(),
           jedisClusterConfig.getConnectionTimeout(),
           jedisClusterConfig.getMaxRedirections(),
           jedisClusterConfig.getPassword(),
+          DEFAULT_CLIENT_NAME,
           genericObjectPoolConfig);
         return new RedisClusterContainer(jedisCluster);
     }
@@ -102,7 +106,7 @@ public class RedisCommandsContainerBuilder {
     public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
         Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
 
-        GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
 
         JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
           jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
@@ -111,8 +115,8 @@ public class RedisCommandsContainerBuilder {
         return new RedisContainer(jedisSentinelPool);
     }
 
-    public static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
-        GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new GenericObjectPoolConfig();
+    public static <T> GenericObjectPoolConfig<T> getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+        GenericObjectPoolConfig<T> genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? (GenericObjectPoolConfig<T>) new JedisPoolConfig() : new GenericObjectPoolConfig<T>();
         genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
         genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
         genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index ee1cc7f..5858eb1 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -69,7 +69,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis List Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.llen(REDIS_KEY));
 
         jedis.del(REDIS_KEY);
     }
@@ -83,7 +83,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Set Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.scard(REDIS_KEY));
 
         jedis.del(REDIS_KEY);
     }
@@ -97,8 +97,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Set Data Type With TTL");
 
-        assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY));
-        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY));
+        assertEquals(TEST_MESSAGE_LENGTH.longValue(), jedis.strlen(REDIS_KEY));
+        assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_KEY));
 
         jedis.del(REDIS_KEY);
     }
@@ -126,7 +126,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisZaddSink);
         env.execute("Test ZADD");
 
-        assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY));
 
         RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig,
                 new RedisAdditionalDataMapper(RedisCommand.ZREM));
@@ -134,7 +134,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisZremSink);
         env.execute("Test ZREM");
 
-        assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY));
+        assertEquals(ZERO.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -148,8 +148,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Hash Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-        assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG, jedis.ttl(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -163,8 +163,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Hash Data Type");
 
-        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
@@ -178,8 +178,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
         source.addSink(redisSink);
         env.execute("Test Redis Hash Data Type 2");
 
-        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-        assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+        assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+        assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
 
         jedis.del(REDIS_ADDITIONAL_KEY);
     }
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
index 0ec4cd5..338e14e 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDes
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
-import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
 
 import java.net.InetSocketAddress;
 import java.util.HashSet;
@@ -110,7 +110,7 @@ public class RedisSinkTest extends TestLogger {
 
             Throwable t = e;
             int depth = 0;
-            while (!(t instanceof JedisConnectionException)) {
+            while (!(t instanceof JedisException)) {
                 t = t.getCause();
                 if (t == null || depth++ == 20) {
                     throw e;
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
index eac5ca0..2b89ec6 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
@@ -25,13 +25,14 @@ import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolC
 import org.apache.flink.test.util.AbstractTestBase;
 import org.junit.Test;
 import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Jedis;
 
 public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
 
     @Test
     public void testNotTestWhileIdle() {
         FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build();
-        GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
         assertFalse(genericObjectPoolConfig.getTestWhileIdle());
         assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
     }
@@ -39,7 +40,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
     @Test
     public void testTestWhileIdle() {
         FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build();
-        GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
         assertTrue(genericObjectPoolConfig.getTestWhileIdle());
         assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
 
@@ -49,7 +50,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
         assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), jedisPoolConfig.getNumTestsPerEvictionRun());
     }
 
-    private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig genericObjectPoolConfig) {
+    private <T> void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig<T> genericObjectPoolConfig) {
         assertEquals(genericObjectPoolConfig.getMaxIdle(), flinkJedisPoolConfig.getMaxIdle());
         assertEquals(genericObjectPoolConfig.getMinIdle(), flinkJedisPoolConfig.getMinIdle());
         assertEquals(genericObjectPoolConfig.getMaxTotal(), flinkJedisPoolConfig.getMaxTotal());