You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/10/14 15:21:19 UTC
[bahir-flink] 01/02: [BAHIR-315] update jedis dependency to 4.3.0
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit ef3d18b07ca5ab2d4b653171777122ae257e747c
Author: netsi <ne...@users.noreply.github.com>
AuthorDate: Tue Oct 11 17:43:55 2022 +0200
[BAHIR-315] update jedis dependency to 4.3.0
---
flink-connector-redis/pom.xml | 2 +-
.../common/container/RedisClusterContainer.java | 9 +-------
.../container/RedisCommandsContainerBuilder.java | 14 ++++++++-----
.../connectors/redis/RedisSinkITCase.java | 24 +++++++++++-----------
.../streaming/connectors/redis/RedisSinkTest.java | 4 ++--
.../RedisCommandsContainerBuilderTest.java | 7 ++++---
6 files changed, 29 insertions(+), 31 deletions(-)
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index d7d67de..71c5eff 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -34,7 +34,7 @@ under the License.
<name>flink-connector-redis</name>
<properties>
- <jedis.version>2.9.0</jedis.version>
+ <jedis.version>4.3.0</jedis.version>
</properties>
<dependencyManagement>
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index d61716b..d2aa13e 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -47,14 +47,7 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
}
@Override
- public void open() throws Exception {
-
- // echo() tries to open a connection and echos back the
- // message passed as argument. Here we use it to monitor
- // if we can communicate with the cluster.
-
- jedisCluster.echo("Test");
- }
+ public void open() throws Exception {}
@Override
public void hset(final String key, final String hashField, final String value, final Integer ttl) {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index b06a6e9..0ffbbe0 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -25,6 +25,8 @@ import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisSentinelPool;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Connection;
import java.util.Objects;
@@ -32,6 +34,7 @@ import java.util.Objects;
* The builder for {@link RedisCommandsContainer}.
*/
public class RedisCommandsContainerBuilder {
+ private static final String DEFAULT_CLIENT_NAME = "default_client";
/**
* Initialize the {@link RedisCommandsContainer} based on the instance type.
@@ -63,7 +66,7 @@ public class RedisCommandsContainerBuilder {
public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
- GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
+ GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
@@ -81,13 +84,14 @@ public class RedisCommandsContainerBuilder {
public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
- GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
+ GenericObjectPoolConfig<Connection> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
jedisClusterConfig.getConnectionTimeout(),
jedisClusterConfig.getConnectionTimeout(),
jedisClusterConfig.getMaxRedirections(),
jedisClusterConfig.getPassword(),
+ DEFAULT_CLIENT_NAME,
genericObjectPoolConfig);
return new RedisClusterContainer(jedisCluster);
}
@@ -102,7 +106,7 @@ public class RedisCommandsContainerBuilder {
public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
- GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
+ GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
@@ -111,8 +115,8 @@ public class RedisCommandsContainerBuilder {
return new RedisContainer(jedisSentinelPool);
}
- public static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
- GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new GenericObjectPoolConfig();
+ public static <T> GenericObjectPoolConfig<T> getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+ GenericObjectPoolConfig<T> genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? (GenericObjectPoolConfig<T>) new JedisPoolConfig() : new GenericObjectPoolConfig<T>();
genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index ee1cc7f..5858eb1 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -69,7 +69,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisSink);
env.execute("Test Redis List Data Type");
- assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+ assertEquals(NUM_ELEMENTS.longValue(), jedis.llen(REDIS_KEY));
jedis.del(REDIS_KEY);
}
@@ -83,7 +83,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisSink);
env.execute("Test Redis Set Data Type");
- assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+ assertEquals(NUM_ELEMENTS.longValue(), jedis.scard(REDIS_KEY));
jedis.del(REDIS_KEY);
}
@@ -97,8 +97,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisSink);
env.execute("Test Redis Set Data Type With TTL");
- assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY));
- assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY));
+ assertEquals(TEST_MESSAGE_LENGTH.longValue(), jedis.strlen(REDIS_KEY));
+ assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_KEY));
jedis.del(REDIS_KEY);
}
@@ -126,7 +126,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisZaddSink);
env.execute("Test ZADD");
- assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+ assertEquals(NUM_ELEMENTS.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY));
RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig,
new RedisAdditionalDataMapper(RedisCommand.ZREM));
@@ -134,7 +134,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisZremSink);
env.execute("Test ZREM");
- assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY));
+ assertEquals(ZERO.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY));
jedis.del(REDIS_ADDITIONAL_KEY);
}
@@ -148,8 +148,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisSink);
env.execute("Test Redis Hash Data Type");
- assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
- assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG, jedis.ttl(REDIS_ADDITIONAL_KEY));
+ assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
jedis.del(REDIS_ADDITIONAL_KEY);
}
@@ -163,8 +163,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisSink);
env.execute("Test Redis Hash Data Type");
- assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
- assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+ assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
jedis.del(REDIS_ADDITIONAL_KEY);
}
@@ -178,8 +178,8 @@ public class RedisSinkITCase extends RedisITCaseBase {
source.addSink(redisSink);
env.execute("Test Redis Hash Data Type 2");
- assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
- assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY));
+ assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+ assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
jedis.del(REDIS_ADDITIONAL_KEY);
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
index 0ec4cd5..338e14e 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDes
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisException;
import java.net.InetSocketAddress;
import java.util.HashSet;
@@ -110,7 +110,7 @@ public class RedisSinkTest extends TestLogger {
Throwable t = e;
int depth = 0;
- while (!(t instanceof JedisConnectionException)) {
+ while (!(t instanceof JedisException)) {
t = t.getCause();
if (t == null || depth++ == 20) {
throw e;
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
index eac5ca0..2b89ec6 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
@@ -25,13 +25,14 @@ import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolC
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Test;
import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Jedis;
public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
@Test
public void testNotTestWhileIdle() {
FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build();
- GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+ GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
assertFalse(genericObjectPoolConfig.getTestWhileIdle());
assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
}
@@ -39,7 +40,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
@Test
public void testTestWhileIdle() {
FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build();
- GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+ GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
assertTrue(genericObjectPoolConfig.getTestWhileIdle());
assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
@@ -49,7 +50,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), jedisPoolConfig.getNumTestsPerEvictionRun());
}
- private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig genericObjectPoolConfig) {
+ private <T> void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig<T> genericObjectPoolConfig) {
assertEquals(genericObjectPoolConfig.getMaxIdle(), flinkJedisPoolConfig.getMaxIdle());
assertEquals(genericObjectPoolConfig.getMinIdle(), flinkJedisPoolConfig.getMinIdle());
assertEquals(genericObjectPoolConfig.getMaxTotal(), flinkJedisPoolConfig.getMaxTotal());