You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sathya Murthy <sm...@gmail.com> on 2021/01/09 09:41:31 UTC

kafka stream processor's process method

Hi  there
i m sathya,
i have below requirements in my project , please let me know how to
achieve this requirement.


These are my two kafka stream classes

1. MySource

2. MyProcessor

and Mysource class sends continues stream of data and retrieved in process
method of Myprocessor class.

My requirements are

1) When my each message is processed inside process method, I need to send
response back to MySource class.(either SUCCESS/FAILED)

2) When it unsuccessful like any exception thrown while invoking service
call (newApplication.service(value);)

The process method should stop consume any messages further to prevent data
loss.

could you please help me on this.

1) MySource class

Kstreambuilder .build ().addSource (READ_FROM_TOPIC,
Serdes.String.deserialzer (), Serdes.String.deserialzer (), messages)

.addProcessor (TRAN_PROCESSOR,()->new MyProcessor(),READ_FROM_TOPIC)

2) MyProcessor class

Public class MyProcessor implements Processor<String,String>{

Public void process (String key,String value){

Try{

newApplication.service(value);

} catch (exception e){

}

}

Re: kafka stream processor's process method

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Sathya,

MyProcessor does not have access to MySource, because in MySource you 
just build the topology that is then executed by Kafka Streams. So you 
cannot send anything to MySource, because MyProcessor does not know 
anything about MySource.

If you want to stop consumption upon an exception from your service, you 
throw that exception in process(). That would stop the stream thread on 
which the processor is executed. Other running stream threads on the 
same client and other Streams clients in your Streams application are 
not affected by this exception. If you want to shutdown the Streams 
client on which the stream thread that throws the exception runs you 
need pass a reference of your Streams client (i.e., a reference to the 
KafkaStreams object) to the uncaught exception handler that you can set 
with KafkaStreams#setUncaughtExceptionHandler() and in the uncaught 
exception handler you need to call KafkaStreams#close(Duration.ZERO). 
Make sure you call close() with Duration.ZERO since otherwise you might 
run into a deadlock.

We are currently developing a more sophisticated way to react on 
exceptions that would also allow you to shutdown your whole Streams 
application (i.e. close all KafkaStreams objects) upon an exception. See 
more details under https://cwiki.apache.org/confluence/x/lkN4CQ

Best,
Bruno



On 09.01.21 10:41, Sathya Murthy wrote:
> Hi  there
> i m sathya,
> i have below requirements in my project , please let me know how to
> achieve this requirement.
> 
> 
> These are my two kafka stream classes
> 
> 1. MySource
> 
> 2. MyProcessor
> 
> and Mysource class sends continues stream of data and retrieved in process
> method of Myprocessor class.
> 
> My requirements are
> 
> 1) When my each message is processed inside process method, I need to send
> response back to MySource class.(either SUCCESS/FAILED)
> 
> 2) When it unsuccessful like any exception thrown while invoking service
> call (newApplication.service(value);)
> 
> The process method should stop consume any messages further to prevent data
> loss.
> 
> could you please help me on this.
> 
> 1) MySource class
> 
> Kstreambuilder .build ().addSource (READ_FROM_TOPIC,
> Serdes.String.deserialzer (), Serdes.String.deserialzer (), messages)
> 
> .addProcessor (TRAN_PROCESSOR,()->new MyProcessor(),READ_FROM_TOPIC)
> 
> 2) MyProcessor class
> 
> Public class MyProcessor implements Processor<String,String>{
> 
> Public void process (String key,String value){
> 
> Try{
> 
> newApplication.service(value);
> 
> } catch (exception e){
> 
> }
> 
> }
>