You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ian Duffy <ia...@ianduffy.ie> on 2017/08/24 10:17:19 UTC

Kafka Streams - groupByKey and Count, null result on join

Hi All,

I'm building a streams applications where I wish to take action on the
input when a certain frequency of the input has been seen.

At the moment the application roughly goes:

frequency table = Input Stream -> groupByKey -> Count

input stream with counts = leftJoin frequency table and input stream

branch input stream with counts where count > some threshold send to some
other topic, if <= some threshold send to delay processing topic.

delay stream with counts = leftJoin frequency table and delay stream

branch delay stream with counts where count > some threshold send to some
other topic, if <= some threshold send to delay processing topic.

I've seen intermittent failures where the frequency table is giving a null
on join, no matter how long I wait content coming back in on the delayed
stream keeps getting nulls for the count when looking up the frequency
table.

Any ideas why this might be occurring?

Thanks,

Ian.

Re: Kafka Streams - groupByKey and Count, null result on join

Posted by Guozhang Wang <wa...@gmail.com>.
Ian,

You can try with the `toString` function (assuming you're on the older
version) of KafkaStreams to print the constructed topology and check if
multiple repartition topics is created.

From your code snippet, it is a bit hard to tell, since I do not know
if repeatedInputStream
is already from a topic partitioned by its key: if yes, then it should not
require a repartition again.


Guozhang


On Thu, Aug 31, 2017 at 2:37 AM, Ian Duffy <ia...@ianduffy.ie> wrote:

> Hi Guozhang,
>
> Looking at this again this again I'm a little confused, I'm not using any
> maps and as far as I know, the selectKey is already causing a repartition:
> "base-topic-KSTREAM-KEY-SELECT-0000000003-repartition".
>
> Is using the same KTable in multiple different leftJoins going to be an
> issue?
>
>
>   private val contentInputStreamWithKeyDomain: KStream[String, Content] =
> contentInputStream.selectKey((_: String, value: Content) => value.domain)
>   private val contentCountsByDomain: KTable[String, Long] =
> contentInputStreamWithKeyDomain.groupByKey().count()
>
>   private val contentInputStreamWithCounts: KStream[String, ContentCount] =
> contentInputStreamWithKeyDomain.leftJoin(
>     contentCountsByDomain, (content: Content, count: Long) => {
>     info(s"Joining input ${content.url} with domain content count $count")
>     ContentCount(content, if (count == null) new Long(0) else count)
>   })
>
>   private val repeatedInputStreamWithCounts: KStream[String, ContentCount]
> = repeatedInputStream.leftJoin(
>     contentCountsByDomain, (content: Content, count: Long) => {
>     info(s"Joining repeated input ${content.url} with domain content count
> $count")
>     ContentCount(content, if (count == null) new Long(0) else count)
>   })
>
>
> On 29 August 2017 at 17:18, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Ian, if your issue is indeed due to KAFKA-4601, currently the best way
> > would be what I mentioned in that ticket, i.e. manually call `through` to
> > do the repartition, and then from the repartition topic do the
> aggregation
> > first followed by the join. It will enforce that for each incoming
> record,
> > it will always go through the aggregation processor first, then the join
> > processor. The downside is that this repartition topic is then not
> internal
> > but needs to be managed by you, as the user.
> >
> > We are working on fixing KAFKA-4601 soon, but this may involve a rather
> > general fix, to refactor the DSL translation to go beyond
> > operator-by-operator steps.
> >
> >
> > Guozhang
> >
> > On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <ia...@ianduffy.ie> wrote:
> >
> > > Thanks for the information Guozhang.
> > >
> > > Any recommendations for handling or working around this? It's making
> > tests
> > > very flakey.
> > >
> > > On 24 August 2017 at 23:48, Guozhang Wang <wa...@gmail.com> wrote:
> > >
> > > > Hi Ian,
> > > >
> > > > I suspect it has something to do with your specified topology, in
> which
> > > it
> > > > triggers the join first, then the aggregation updates.
> > > >
> > > > For example, take a look at this ticket:
> > > > https://issues.apache.org/jira/browse/KAFKA-4601
> > > >
> > > > As from its printed topology, due to the repartition topic the join
> > > > operator may be triggered before the aggregation operator, and hence
> it
> > > > would cause the Counts table to return `null` indicating it has not
> > > > received a record yet.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <ia...@ianduffy.ie> wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I'm building a streams applications where I wish to take action on
> > the
> > > > > input when a certain frequency of the input has been seen.
> > > > >
> > > > > At the moment the application roughly goes:
> > > > >
> > > > > frequency table = Input Stream -> groupByKey -> Count
> > > > >
> > > > > input stream with counts = leftJoin frequency table and input
> stream
> > > > >
> > > > > branch input stream with counts where count > some threshold send
> to
> > > some
> > > > > other topic, if <= some threshold send to delay processing topic.
> > > > >
> > > > > delay stream with counts = leftJoin frequency table and delay
> stream
> > > > >
> > > > > branch delay stream with counts where count > some threshold send
> to
> > > some
> > > > > other topic, if <= some threshold send to delay processing topic.
> > > > >
> > > > > I've seen intermittent failures where the frequency table is
> giving a
> > > > null
> > > > > on join, no matter how long I wait content coming back in on the
> > > delayed
> > > > > stream keeps getting nulls for the count when looking up the
> > frequency
> > > > > table.
> > > > >
> > > > > Any ideas why this might be occurring?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Ian.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka Streams - groupByKey and Count, null result on join

Posted by Ian Duffy <ia...@ianduffy.ie>.
Hi Guozhang,

Looking at this again this again I'm a little confused, I'm not using any
maps and as far as I know, the selectKey is already causing a repartition:
"base-topic-KSTREAM-KEY-SELECT-0000000003-repartition".

Is using the same KTable in multiple different leftJoins going to be an
issue?


  private val contentInputStreamWithKeyDomain: KStream[String, Content] =
contentInputStream.selectKey((_: String, value: Content) => value.domain)
  private val contentCountsByDomain: KTable[String, Long] =
contentInputStreamWithKeyDomain.groupByKey().count()

  private val contentInputStreamWithCounts: KStream[String, ContentCount] =
contentInputStreamWithKeyDomain.leftJoin(
    contentCountsByDomain, (content: Content, count: Long) => {
    info(s"Joining input ${content.url} with domain content count $count")
    ContentCount(content, if (count == null) new Long(0) else count)
  })

  private val repeatedInputStreamWithCounts: KStream[String, ContentCount]
= repeatedInputStream.leftJoin(
    contentCountsByDomain, (content: Content, count: Long) => {
    info(s"Joining repeated input ${content.url} with domain content count
$count")
    ContentCount(content, if (count == null) new Long(0) else count)
  })


On 29 August 2017 at 17:18, Guozhang Wang <wa...@gmail.com> wrote:

> Ian, if your issue is indeed due to KAFKA-4601, currently the best way
> would be what I mentioned in that ticket, i.e. manually call `through` to
> do the repartition, and then from the repartition topic do the aggregation
> first followed by the join. It will enforce that for each incoming record,
> it will always go through the aggregation processor first, then the join
> processor. The downside is that this repartition topic is then not internal
> but needs to be managed by you, as the user.
>
> We are working on fixing KAFKA-4601 soon, but this may involve a rather
> general fix, to refactor the DSL translation to go beyond
> operator-by-operator steps.
>
>
> Guozhang
>
> On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <ia...@ianduffy.ie> wrote:
>
> > Thanks for the information Guozhang.
> >
> > Any recommendations for handling or working around this? It's making
> tests
> > very flakey.
> >
> > On 24 August 2017 at 23:48, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Hi Ian,
> > >
> > > I suspect it has something to do with your specified topology, in which
> > it
> > > triggers the join first, then the aggregation updates.
> > >
> > > For example, take a look at this ticket:
> > > https://issues.apache.org/jira/browse/KAFKA-4601
> > >
> > > As from its printed topology, due to the repartition topic the join
> > > operator may be triggered before the aggregation operator, and hence it
> > > would cause the Counts table to return `null` indicating it has not
> > > received a record yet.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <ia...@ianduffy.ie> wrote:
> > >
> > > > Hi All,
> > > >
> > > > I'm building a streams applications where I wish to take action on
> the
> > > > input when a certain frequency of the input has been seen.
> > > >
> > > > At the moment the application roughly goes:
> > > >
> > > > frequency table = Input Stream -> groupByKey -> Count
> > > >
> > > > input stream with counts = leftJoin frequency table and input stream
> > > >
> > > > branch input stream with counts where count > some threshold send to
> > some
> > > > other topic, if <= some threshold send to delay processing topic.
> > > >
> > > > delay stream with counts = leftJoin frequency table and delay stream
> > > >
> > > > branch delay stream with counts where count > some threshold send to
> > some
> > > > other topic, if <= some threshold send to delay processing topic.
> > > >
> > > > I've seen intermittent failures where the frequency table is giving a
> > > null
> > > > on join, no matter how long I wait content coming back in on the
> > delayed
> > > > stream keeps getting nulls for the count when looking up the
> frequency
> > > > table.
> > > >
> > > > Any ideas why this might be occurring?
> > > >
> > > > Thanks,
> > > >
> > > > Ian.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams - groupByKey and Count, null result on join

Posted by Guozhang Wang <wa...@gmail.com>.
Ian, if your issue is indeed due to KAFKA-4601, currently the best way
would be what I mentioned in that ticket, i.e. manually call `through` to
do the repartition, and then from the repartition topic do the aggregation
first followed by the join. It will enforce that for each incoming record,
it will always go through the aggregation processor first, then the join
processor. The downside is that this repartition topic is then not internal
but needs to be managed by you, as the user.

We are working on fixing KAFKA-4601 soon, but this may involve a rather
general fix, to refactor the DSL translation to go beyond
operator-by-operator steps.


Guozhang

On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <ia...@ianduffy.ie> wrote:

> Thanks for the information Guozhang.
>
> Any recommendations for handling or working around this? It's making tests
> very flakey.
>
> On 24 August 2017 at 23:48, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Ian,
> >
> > I suspect it has something to do with your specified topology, in which
> it
> > triggers the join first, then the aggregation updates.
> >
> > For example, take a look at this ticket:
> > https://issues.apache.org/jira/browse/KAFKA-4601
> >
> > As from its printed topology, due to the repartition topic the join
> > operator may be triggered before the aggregation operator, and hence it
> > would cause the Counts table to return `null` indicating it has not
> > received a record yet.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <ia...@ianduffy.ie> wrote:
> >
> > > Hi All,
> > >
> > > I'm building a streams applications where I wish to take action on the
> > > input when a certain frequency of the input has been seen.
> > >
> > > At the moment the application roughly goes:
> > >
> > > frequency table = Input Stream -> groupByKey -> Count
> > >
> > > input stream with counts = leftJoin frequency table and input stream
> > >
> > > branch input stream with counts where count > some threshold send to
> some
> > > other topic, if <= some threshold send to delay processing topic.
> > >
> > > delay stream with counts = leftJoin frequency table and delay stream
> > >
> > > branch delay stream with counts where count > some threshold send to
> some
> > > other topic, if <= some threshold send to delay processing topic.
> > >
> > > I've seen intermittent failures where the frequency table is giving a
> > null
> > > on join, no matter how long I wait content coming back in on the
> delayed
> > > stream keeps getting nulls for the count when looking up the frequency
> > > table.
> > >
> > > Any ideas why this might be occurring?
> > >
> > > Thanks,
> > >
> > > Ian.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka Streams - groupByKey and Count, null result on join

Posted by Ian Duffy <ia...@ianduffy.ie>.
Thanks for the information Guozhang.

Any recommendations for handling or working around this? It's making tests
very flakey.

On 24 August 2017 at 23:48, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Ian,
>
> I suspect it has something to do with your specified topology, in which it
> triggers the join first, then the aggregation updates.
>
> For example, take a look at this ticket:
> https://issues.apache.org/jira/browse/KAFKA-4601
>
> As from its printed topology, due to the repartition topic the join
> operator may be triggered before the aggregation operator, and hence it
> would cause the Counts table to return `null` indicating it has not
> received a record yet.
>
>
> Guozhang
>
>
> On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <ia...@ianduffy.ie> wrote:
>
> > Hi All,
> >
> > I'm building a streams applications where I wish to take action on the
> > input when a certain frequency of the input has been seen.
> >
> > At the moment the application roughly goes:
> >
> > frequency table = Input Stream -> groupByKey -> Count
> >
> > input stream with counts = leftJoin frequency table and input stream
> >
> > branch input stream with counts where count > some threshold send to some
> > other topic, if <= some threshold send to delay processing topic.
> >
> > delay stream with counts = leftJoin frequency table and delay stream
> >
> > branch delay stream with counts where count > some threshold send to some
> > other topic, if <= some threshold send to delay processing topic.
> >
> > I've seen intermittent failures where the frequency table is giving a
> null
> > on join, no matter how long I wait content coming back in on the delayed
> > stream keeps getting nulls for the count when looking up the frequency
> > table.
> >
> > Any ideas why this might be occurring?
> >
> > Thanks,
> >
> > Ian.
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams - groupByKey and Count, null result on join

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Ian,

I suspect it has something to do with your specified topology, in which it
triggers the join first, then the aggregation updates.

For example, take a look at this ticket:
https://issues.apache.org/jira/browse/KAFKA-4601

As from its printed topology, due to the repartition topic the join
operator may be triggered before the aggregation operator, and hence it
would cause the Counts table to return `null` indicating it has not
received a record yet.


Guozhang


On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <ia...@ianduffy.ie> wrote:

> Hi All,
>
> I'm building a streams applications where I wish to take action on the
> input when a certain frequency of the input has been seen.
>
> At the moment the application roughly goes:
>
> frequency table = Input Stream -> groupByKey -> Count
>
> input stream with counts = leftJoin frequency table and input stream
>
> branch input stream with counts where count > some threshold send to some
> other topic, if <= some threshold send to delay processing topic.
>
> delay stream with counts = leftJoin frequency table and delay stream
>
> branch delay stream with counts where count > some threshold send to some
> other topic, if <= some threshold send to delay processing topic.
>
> I've seen intermittent failures where the frequency table is giving a null
> on join, no matter how long I wait content coming back in on the delayed
> stream keeps getting nulls for the count when looking up the frequency
> table.
>
> Any ideas why this might be occurring?
>
> Thanks,
>
> Ian.
>



-- 
-- Guozhang