You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by "facundo.maldonado" <ma...@gmail.com> on 2020/11/12 18:57:58 UTC

KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Hi all, I'm having some problems dealing with the KafkaStreamer.

I have a deployment with a streamer (client node) that consumes records from
a Kafka topic, and a data node (cache storage).

If for any reason, the cache node crashes or simple restarts, the client
node gets disconnected, but the KafkaStreamer keeps pulling records and
tries to push to the cache.

Is there a recommended way to stop the kafkaStreamer on client disconnection
and resume once the connection is established again?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Posted by akorensh <al...@gmail.com>.
flush() guarantees completion of all futures returned by addData(Object,
Object)
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteDataStreamer.html#flush--

flush() will send the batch, but it is still possible for the server to
crash before the message reaches it.

If you need verify whether the particular put actually made it to the
server, appropriate events are available.
use these events:
https://ignite.apache.org/docs/latest/events/events#cache-events

You can put in retry logic if the client has disconnected.

One possibility is to use the CacheException as per the contract of the
flush() method 
you will get something like this: 
javax.cache.CacheException: class
org.apache.ignite.IgniteClientDisconnectedException: Data streamer has been
closed, client node disconnected.
	at
org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1275)
	at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.doFlush(DataStreamerImpl.java:1204)






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Posted by Pelado <ma...@gmail.com>.
Thanks for the answers.
I resolved the problem of reconnection using events. It worked very well.
What I found, is the following...
The KafkaStreamer consumes records and send them to the IgniteDataStreamer.
It doesn't handle the IgniteFuture returned.
If the connection with the server is interrupted (server restart for
example) the KafkaStreamer is stoped, kafka consumers are stoped, but those
records that were sent to the streamer and (I believe) are in the buffer
are still trying to be saved in the cache.
There is no way to recover them as far as I know.
Am I right?
Should I implement a custom KafkaStreamer that, in that situation, handles
the IgniteFuture and let's say retry the insertion in the cache?

Another question, I'm using a grid service to start the streamer. What is
the benefit of this vs a simple spring service if I'm using kubernetes for
deployment?




On Fri, Nov 20, 2020 at 5:01 PM akorensh <al...@gmail.com> wrote:

> Hi,
>   I think listening to events would be a good solution for you.
>
> There are two discovery events that are triggered on the client node when
> it
> is disconnected from or reconnected to the cluster:
>
> EVT_CLIENT_NODE_DISCONNECTED
>
> EVT_CLIENT_NODE_RECONNECTED
>
> see:
>
> https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events
>
>
> As for StreamReceiver: Keep in mind that the logic implemented in a stream
> receiver is executed on the node where data is to be stored.  If the server
> where the data resides crashes, your code might not execute.
> https://ignite.apache.org/docs/latest/data-streaming#stream-visitor
>
> Thanks, Alex
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>


-- 
Facundo Maldonado

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Posted by akorensh <al...@gmail.com>.
Hi,
  I think listening to events would be a good solution for you. 

There are two discovery events that are triggered on the client node when it
is disconnected from or reconnected to the cluster:

EVT_CLIENT_NODE_DISCONNECTED

EVT_CLIENT_NODE_RECONNECTED

see:
https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events


As for StreamReceiver: Keep in mind that the logic implemented in a stream
receiver is executed on the node where data is to be stored.  If the server
where the data resides crashes, your code might not execute.
https://ignite.apache.org/docs/latest/data-streaming#stream-visitor

Thanks, Alex





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Posted by "facundo.maldonado" <ma...@gmail.com>.
I forgot to mention, I'm starting the KafkaStreamer in a cluster service.
Pretty similar to all the examples that are around. 

I saw the exception in the documentation, my concern here is where should I
catch it given that I initialize and setup the streamer on the init() method
and start it in the execute()? Should I create a custom implementation of a
StreamReceiver (holding a reference to the KafkaStreamer) that actually call
the cache.put() method, cach the exception and stop the streamer?

I didn't take into account the event stuff, the solution may be on that
path. I think is valid to add a listener on the service init() method,
right?

Thanks Alex for your answer.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection

Posted by akorensh <al...@gmail.com>.
Hi, 
  You can use disconnect events/exception, and then use KafkaStreamer.stop.
  
  see:
https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events

  https://ignite.apache.org/docs/latest/clustering/connect-client-nodes 
  Here look for: While a client is in a disconnected state and an attempt to
reconnect is in progress, the Ignite API throws a
IgniteClientDisconnectedException
   

  KafkaStreamer stop method:
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/stream/kafka/KafkaStreamer.html#stop--
 
Thanks, Alex



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/