You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Duy Truong <du...@gmail.com> on 2017/08/17 16:11:46 UTC

Continue to consume messages when exception occurs in Kafka Stream

Hi everyone,

My kafka stream app has an exception (my business exception), and then it
doesn't consume messages anymore. Is there any way to make my app continues
consume messages when the exception occurs?

Thanks

-- 
*Duy Truong*

Re: Continue to consume messages when exception occurs in Kafka Stream

Posted by Eno Thereska <en...@gmail.com>.
Hi Duy,

What kind of exception are you getting? With KIP-161 (checked in trunk) we allow log-and-skip type exception handlers for deserialization errors: https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+deserialization+exception+handlers>

Is yours a deserialization exception or a higher level exception in your code? From your email I think its the latter. Could you describe the scenario a bit more? We don’t currently have a way to skip on the latter errors but would be interested to collect more info from users.

Thanks
Eno

> On Aug 17, 2017, at 5:11 PM, Duy Truong <du...@gmail.com> wrote:
> 
> Hi everyone,
> 
> My kafka stream app has an exception (my business exception), and then it
> doesn't consume messages anymore. Is there any way to make my app continues
> consume messages when the exception occurs?
> 
> Thanks
> 
> -- 
> *Duy Truong*


Re: Continue to consume messages when exception occurs in Kafka Stream

Posted by Duy Truong <du...@gmail.com>.
OK, I got it, thank you Damian, Eno.

On Fri, Aug 18, 2017 at 4:30 PM, Damian Guy <da...@gmail.com> wrote:

> Duy, if it is in you logic then you need to handle the exception yourself.
> If you don't then it will bubble out and kill the thread.
>
> On Fri, 18 Aug 2017 at 10:27 Duy Truong <du...@gmail.com>
> wrote:
>
> > Hi Eno,
> >
> > Sorry for late reply, it's not a deserialization exception, it's a
> pattern
> > matching exception in my logic.
> >
> > val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(
> userTable,
> >       (eventId: String, datatup: (DataLog, Option[CrawlData])) => {
> >         datatup._1.rawData.userId
> >       },
> >       (tuple, fbData: FacebookData) => {
> >         val (dmpData, Some(crawData)) = tuple // exception here
> >
> >         // something here
> >
> >       })
> >
> > Thanks
> >
> >
> > On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong <ducduytruong2012@gmail.com
> >
> > wrote:
> >
> > > Hi everyone,
> > >
> > > My kafka stream app has an exception (my business exception), and then
> it
> > > doesn't consume messages anymore. Is there any way to make my app
> > continues
> > > consume messages when the exception occurs?
> > >
> > > Thanks
> > >
> > > --
> > > *Duy Truong*
> > >
> >
> >
> >
> > --
> > *Duy Truong*
> >
>



-- 
*Duy Truong*

Re: Continue to consume messages when exception occurs in Kafka Stream

Posted by Damian Guy <da...@gmail.com>.
Duy, if it is in you logic then you need to handle the exception yourself.
If you don't then it will bubble out and kill the thread.

On Fri, 18 Aug 2017 at 10:27 Duy Truong <du...@gmail.com> wrote:

> Hi Eno,
>
> Sorry for late reply, it's not a deserialization exception, it's a pattern
> matching exception in my logic.
>
> val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable,
>       (eventId: String, datatup: (DataLog, Option[CrawlData])) => {
>         datatup._1.rawData.userId
>       },
>       (tuple, fbData: FacebookData) => {
>         val (dmpData, Some(crawData)) = tuple // exception here
>
>         // something here
>
>       })
>
> Thanks
>
>
> On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong <du...@gmail.com>
> wrote:
>
> > Hi everyone,
> >
> > My kafka stream app has an exception (my business exception), and then it
> > doesn't consume messages anymore. Is there any way to make my app
> continues
> > consume messages when the exception occurs?
> >
> > Thanks
> >
> > --
> > *Duy Truong*
> >
>
>
>
> --
> *Duy Truong*
>

Re: Continue to consume messages when exception occurs in Kafka Stream

Posted by Duy Truong <du...@gmail.com>.
Hi Eno,

Sorry for late reply, it's not a deserialization exception, it's a pattern
matching exception in my logic.

val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable,
      (eventId: String, datatup: (DataLog, Option[CrawlData])) => {
        datatup._1.rawData.userId
      },
      (tuple, fbData: FacebookData) => {
        val (dmpData, Some(crawData)) = tuple // exception here

        // something here

      })

Thanks


On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong <du...@gmail.com>
wrote:

> Hi everyone,
>
> My kafka stream app has an exception (my business exception), and then it
> doesn't consume messages anymore. Is there any way to make my app continues
> consume messages when the exception occurs?
>
> Thanks
>
> --
> *Duy Truong*
>



-- 
*Duy Truong*