You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "João Boto (Jira)" <ji...@apache.org> on 2022/08/05 19:20:00 UTC

[jira] [Assigned] (BAHIR-306) Please release a new version of flink-connector-redis_2.11 to fix the Jedis bug

     [ https://issues.apache.org/jira/browse/BAHIR-306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

João Boto reassigned BAHIR-306:
-------------------------------

    Assignee: João Boto

> Please release a new version of flink-connector-redis_2.11 to fix the Jedis bug
> -------------------------------------------------------------------------------
>
>                 Key: BAHIR-306
>                 URL: https://issues.apache.org/jira/browse/BAHIR-306
>             Project: Bahir
>          Issue Type: Wish
>          Components: Flink Streaming Connectors
>            Reporter: Yang Bodong
>            Assignee: João Boto
>            Priority: Blocker
>
> It has been 5 years since the latest `flink-connector-redis_2.11` release [https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis_2.11] , we recently encountered the following error when upgrading Redis version:
>  
> {code:java}
> java.lang.ExceptionInInitializerError
>   at com.ymm.realtime.function.DesignedMapFunction.flatMap(DesignedMapFunction.java:41)
>   at com.ymm.realtime.function.DesignedMapFunction.flatMap(DesignedMapFunction.java:16)
>   at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>   at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>   at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>   at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>   at com.ymm.realtime.KafkaBooStrap.lambda$main$f53f5b13$1(KafkaBooStrap.java:52)
>   at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>   at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>   at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>   at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>   at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>   at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>   at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>   at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: java.lang.NumberFormatException: For input string: "6379@13028"
>   at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>   at java.lang.Integer.parseInt(Integer.java:580)
>   at java.lang.Integer.valueOf(Integer.java:766)
>   at redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39)
>   at redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14)
>   at redis.clients.jedis.JedisClusterInfoCache.discoverClusterNodesAndSlots(JedisClusterInfoCache.java:50)
>   at redis.clients.jedis.JedisClusterConnectionHandler.initializeSlotsCache(JedisClusterConnectionHandler.java:39)
>   at redis.clients.jedis.JedisClusterConnectionHandler.<init>(JedisClusterConnectionHandler.java:28)
>   at redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:21)
>   at redis.clients.jedis.BinaryJedisCluster.<init>(BinaryJedisCluster.java:46)
>   at redis.clients.jedis.JedisCluster.<init>(JedisCluster.java:50)
>   at com.ymm.realtime.util.RedisUtils.initJedisCluster(RedisUtils.java:36)
>   at com.ymm.realtime.util.RedisUtils.<clinit>(RedisUtils.java:25) {code}
>  
> I have confirmed that the problem is because `flink-connector-redis_2.11` depends on `Jedis` version `2.8.0`. But `Jedis` has [fixed this problem|https://github.com/redis/jedis/issues/1958] in `2.9.0`. It is worth mentioning that the latest code on GitHub shows that `flink-connector-redis_2.11` has also been updated to the `Jedis` version [https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/pom.xml#L37] , but I have not Searching for the latest package on mvn, what did I miss?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)