You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/10/15 11:27:29 UTC

Correct way to handle RedisSink exception

Hi,
I have a streaming application that pushes output to a redis cluster sink.
I am using the Apache Bahir[1] Flink redis connector for this. I want to
handle the case when the redis server is unavailable.
I am following the same pattern as outlined by them in [1]:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new
InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new
RedisExampleMapper());

However, if the redis server is not available, my whole job crashes with
this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis
has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a
resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class.
Where exactly do I put my try/catch to do this? Enclosing the last in the
code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the
RedisSink class, but that is a library class provided by Bahir. Is my
thinking correct?


asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/


Regards,
Manas

Re: Correct way to handle RedisSink exception

Posted by Chesnay Schepler <ch...@apache.org>.
You will have to create a custom version of the redis connector that 
ignores such exceptions.

On 10/15/2020 1:27 PM, Manas Kale wrote:
> Hi,
> I have a streaming application that pushes output to a redis cluster 
> sink. I am using the Apache Bahir[1] Flink redis connector for this. I 
> want to handle the case when the redis server is unavailable.
> I am following the same pattern as outlined by them in [1]:
> |FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() 
> .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new 
> InetSocketAddress(5601)))).build(); DataStream<String> stream = ...; 
> stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
> RedisExampleMapper());|
> However, if the redis server is not available, my whole job crashes 
> with this exception:
>
> ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - 
> Redis has not been properly initialized:
> redis.clients.jedis.exceptions.JedisConnectionException: Could not get 
> a resource from the pool
> at redis.clients.util.Pool.getResource(Pool.java:53)
> at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
> ...
>
> I want to handle and ignore such exceptions thrown by the RedisSink 
> class. Where exactly do I put my try/catch to do this? Enclosing the 
> last in the code snippet with try/catch does not work.
> I believe the only way to do this would be to handle the exception in 
> the RedisSink class, but that is a library class provided by Bahir. Is 
> my thinking correct?
>
>
> asd
> [1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
>
>
> Regards,
> Manas



Re: Correct way to handle RedisSink exception

Posted by Manas Kale <ma...@gmail.com>.
Hi all,
Thank you for the help, I understand now.

On Thu, Oct 15, 2020 at 5:28 PM 阮树斌 浙江大学 <zj...@163.com> wrote:

> hello, Manas Kale.
>
> From the log, it can be found that the exception was thrown on the
> 'open()' method of the RedisSink class. You can inherit the RedisSink
> class, then override the 'open()' method, and handle the exception as you
> wish.Or no longer use Apache Bahir[1] Flink redis connector class library,
> and inherit RichSinkFunction to develop a custom RedisSink class.
>
> Regards
> Shubin Ruan
>
> At 2020-10-15 19:27:29, "Manas Kale" <ma...@gmail.com> wrote:
>
> Hi,
> I have a streaming application that pushes output to a redis cluster sink.
> I am using the Apache Bahir[1] Flink redis connector for this. I want to
> handle the case when the redis server is unavailable.
> I am following the same pattern as outlined by them in [1]:
>
> FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
>     .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
>
> DataStream<String> stream = ...;
> stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
>
> However, if the redis server is not available, my whole job crashes with
> this exception:
>
> ERROR org.apache.flink.streaming.connectors.redis.RedisSink         -
> Redis has not been properly initialized:
> redis.clients.jedis.exceptions.JedisConnectionException: Could not get a
> resource from the pool
> at redis.clients.util.Pool.getResource(Pool.java:53)
> at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
> ...
>
> I want to handle and ignore such exceptions thrown by the RedisSink class.
> Where exactly do I put my try/catch to do this? Enclosing the last in the
> code snippet with try/catch does not work.
> I believe the only way to do this would be to handle the exception in the
> RedisSink class, but that is a library class provided by Bahir. Is my
> thinking correct?
>
>
> asd
> [1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
>
>
> Regards,
> Manas
>
>
>
>
>
>
>
>

Re:Correct way to handle RedisSink exception

Posted by 阮树斌 浙江大学 <zj...@163.com>.
hello, Manas Kale.


From the log, it can be found that the exception was thrown on the 'open()' method of the RedisSink class. You can inherit the RedisSink class, then override the 'open()' method, and handle the exception as you wish.Or no longer use Apache Bahir[1] Flink redis connector class library, and inherit RichSinkFunction to develop a custom RedisSink class.


Regards
Shubin Ruan

At 2020-10-15 19:27:29, "Manas Kale" <ma...@gmail.com> wrote:

Hi,
I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. 
I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
However, if the redis server is not available, my whole job crashes with this exception:


ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)

...


I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct?




asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/




Regards,
Manas