You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2020/01/30 18:29:31 UTC

Kafka streams DSL going into infinite loop

Hi,
In a case I have found that when I define my topology using streams DSL it
tends to go into infinite loop.
This usually happens if I start my stream and shut it down and restart it
again after a while (by that time source topic has moved ahead).

Stream processing seems to be stuck in a loop and does not seem to progress
ahead.

My topology is something like this:

source.branch(
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...
    )
stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
log(v123478))
stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
log(vFinal))

What I observe is that it keeps printing log(vFinal)with same value again
and again and does not seem to progress ahead.
Any idea why this may be happening. What can I check to understand what
could be going wrong.

If I stop kafka and delete all the data and the restart everything then it
all works fine from the next set of data.
But then I loose the source streams data from processing, before I had
restarted it (when it went into this infinite loop).

Thanks
Sachin

Re: Kafka streams DSL going into infinite loop

Posted by "Matthias J. Sax" <ma...@confluent.io>.
>> I really don't know what TOPOLOGY_OPTIMIZATION is for.

If you enable optimization, Kafka Streams tries to generate a more
efficient Topology when translating the DSL program. We are working on
some more documentation of this feature atm:
https://cwiki.apache.org/confluence/display/KAFKA/DSL+Optimizer

Can you share your `TopologyDescription`? Also, do you see the same
problem if you disable topology-optimization? I would like to understand
if it might be related to repartition topics (and the optimization that
might merge repartition topics).

Also, you did not answer my question from before. Does the application
commit offsets on the input (and repartition) topic what shows if it
does make progress or not?

I have no idea yet what the problem could be.

-Matthias



On 1/30/20 11:16 AM, Sachin Mittal wrote:
> Only streams specific props I am using are:
> 
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
> props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
> 
> Yes there are three sub topologies and key does change between joins.
> Is this something to do with any of the above configs.
> I really don't know what TOPOLOGY_OPTIMIZATION is for.
> 
> The way I start my stream is:
> 
>     final KafkaStreams streams = new KafkaStreams(topology, props);
>     final CountDownLatch latch = new CountDownLatch(1);
>     // attach shutdown handler to catch control-c
>     Runtime.getRuntime().addShutdownHook(new
> Thread("streams-shutdown-hook") {
>        ...
>     });
>     try {
>         streams.cleanUp();
>         streams.start();
>         latch.await();
>     } catch (Throwable e) {
>         System.exit(1);
>     }
>     System.exit(0);
> 
> Is the method to start the stream OK.
> There is indeed no loop so why the does stream processing get stuck logging
> the final loop forever.
> 
> Is there any direction you can suggest to look into. Any more logging at
> streams or kafka level.
> 
> Thanks
> Sachin
> 
> 
> 
> On Fri, Jan 31, 2020 at 12:31 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Your program does not seem to contain a loop. Hence, it's unclear to me
>> atm what could be the issue.
>>
>> Does the application commit offset on the input topic, what would be an
>> indicator that it actually does make progress.
>>
>> Do you change the key between joins, ie, is there multiple
>> sub-topologies that are connected via repartition topics, or is it one
>> single sub-topology?
>>
>>
>> -Matthias
>>
>> On 1/30/20 10:29 AM, Sachin Mittal wrote:
>>> Hi,
>>> In a case I have found that when I define my topology using streams DSL
>> it
>>> tends to go into infinite loop.
>>> This usually happens if I start my stream and shut it down and restart it
>>> again after a while (by that time source topic has moved ahead).
>>>
>>> Stream processing seems to be stuck in a loop and does not seem to
>> progress
>>> ahead.
>>>
>>> My topology is something like this:
>>>
>>> source.branch(
>>>         (k, v) -> ...,
>>>         (k, v) -> ...,
>>>         (k, v) -> ...,
>>>         (k, v) -> ...,
>>>         (k, v) -> ...,
>>>         (k, v) -> ...,
>>>         (k, v) -> ...,
>>>         (k, v) -> ...
>>>     )
>>> stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
>>> stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
>>> stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
>>> stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
>>> stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
>>> stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
>>> log(v123478))
>>> stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
>>> final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
>>> log(vFinal))
>>>
>>> What I observe is that it keeps printing log(vFinal)with same value again
>>> and again and does not seem to progress ahead.
>>> Any idea why this may be happening. What can I check to understand what
>>> could be going wrong.
>>>
>>> If I stop kafka and delete all the data and the restart everything then
>> it
>>> all works fine from the next set of data.
>>> But then I loose the source streams data from processing, before I had
>>> restarted it (when it went into this infinite loop).
>>>
>>> Thanks
>>> Sachin
>>>
>>
>>
> 


Re: Kafka streams DSL going into infinite loop

Posted by Sachin Mittal <sj...@gmail.com>.
Only streams specific props I am using are:

props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

Yes there are three sub topologies and key does change between joins.
Is this something to do with any of the above configs.
I really don't know what TOPOLOGY_OPTIMIZATION is for.

The way I start my stream is:

    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);
    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new
Thread("streams-shutdown-hook") {
       ...
    });
    try {
        streams.cleanUp();
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);

Is the method to start the stream OK.
There is indeed no loop so why the does stream processing get stuck logging
the final loop forever.

Is there any direction you can suggest to look into. Any more logging at
streams or kafka level.

Thanks
Sachin



On Fri, Jan 31, 2020 at 12:31 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> Your program does not seem to contain a loop. Hence, it's unclear to me
> atm what could be the issue.
>
> Does the application commit offset on the input topic, what would be an
> indicator that it actually does make progress.
>
> Do you change the key between joins, ie, is there multiple
> sub-topologies that are connected via repartition topics, or is it one
> single sub-topology?
>
>
> -Matthias
>
> On 1/30/20 10:29 AM, Sachin Mittal wrote:
> > Hi,
> > In a case I have found that when I define my topology using streams DSL
> it
> > tends to go into infinite loop.
> > This usually happens if I start my stream and shut it down and restart it
> > again after a while (by that time source topic has moved ahead).
> >
> > Stream processing seems to be stuck in a loop and does not seem to
> progress
> > ahead.
> >
> > My topology is something like this:
> >
> > source.branch(
> >         (k, v) -> ...,
> >         (k, v) -> ...,
> >         (k, v) -> ...,
> >         (k, v) -> ...,
> >         (k, v) -> ...,
> >         (k, v) -> ...,
> >         (k, v) -> ...,
> >         (k, v) -> ...
> >     )
> > stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
> > stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
> > stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
> > stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
> > stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
> > stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
> > log(v123478))
> > stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
> > final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
> > log(vFinal))
> >
> > What I observe is that it keeps printing log(vFinal)with same value again
> > and again and does not seem to progress ahead.
> > Any idea why this may be happening. What can I check to understand what
> > could be going wrong.
> >
> > If I stop kafka and delete all the data and the restart everything then
> it
> > all works fine from the next set of data.
> > But then I loose the source streams data from processing, before I had
> > restarted it (when it went into this infinite loop).
> >
> > Thanks
> > Sachin
> >
>
>

Re: Kafka streams DSL going into infinite loop

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Your program does not seem to contain a loop. Hence, it's unclear to me
atm what could be the issue.

Does the application commit offset on the input topic, what would be an
indicator that it actually does make progress.

Do you change the key between joins, ie, is there multiple
sub-topologies that are connected via repartition topics, or is it one
single sub-topology?


-Matthias

On 1/30/20 10:29 AM, Sachin Mittal wrote:
> Hi,
> In a case I have found that when I define my topology using streams DSL it
> tends to go into infinite loop.
> This usually happens if I start my stream and shut it down and restart it
> again after a while (by that time source topic has moved ahead).
> 
> Stream processing seems to be stuck in a loop and does not seem to progress
> ahead.
> 
> My topology is something like this:
> 
> source.branch(
>         (k, v) -> ...,
>         (k, v) -> ...,
>         (k, v) -> ...,
>         (k, v) -> ...,
>         (k, v) -> ...,
>         (k, v) -> ...,
>         (k, v) -> ...,
>         (k, v) -> ...
>     )
> stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
> stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
> stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
> stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
> stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
> stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
> log(v123478))
> stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
> final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
> log(vFinal))
> 
> What I observe is that it keeps printing log(vFinal)with same value again
> and again and does not seem to progress ahead.
> Any idea why this may be happening. What can I check to understand what
> could be going wrong.
> 
> If I stop kafka and delete all the data and the restart everything then it
> all works fine from the next set of data.
> But then I loose the source streams data from processing, before I had
> restarted it (when it went into this infinite loop).
> 
> Thanks
> Sachin
>