You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Hamidreza Afzali <ha...@hivestreaming.com> on 2016/11/22 20:37:47 UTC

kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Hi,

When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the combination of .map(...) and .groupByKey(...).count(...) does not produce any result.

The topology looks like this:

builder.stream(Serdes.String, Serdes.Integer, inputTopic)
 .map((k, v) => new KeyValue(fn(k), v))
 .groupByKey(Serdes.String, Serdes.Integer)
 .count(stateStore)

It works if we remove .map(...) or .groupByKey(...).count(...).

Is this a bug?

Thanks in advance,
Hamid


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Damian Guy <da...@gmail.com>.
Hi Hamid,

Out of interest - what are the results if you use KStreamTestDriver?

Thanks,
Damian

On Thu, 24 Nov 2016 at 12:05 Hamidreza Afzali <
hamidreza.afzali@hivestreaming.com> wrote:

> The map() returns non-null keys and values and produces the following
> stream:
>
> [KSTREAM-MAP-0000000001]: A , 1
> [KSTREAM-MAP-0000000001]: A , 2
> [KSTREAM-MAP-0000000001]: B , 3
>
> The issue arises when the combination of map() and groupByKey().count() is
> used with ProcessorTopologyTestDriver.
>
> I have tried the topology on a local Kafka and got the expected result:
>
> input: <"A-1", 1>, <"A-2", 2>, <"B-1", 3>
> result: <"A":2>, <"B":1>.
>
>
> Thanks,
> Hamid
>
>

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Guozhang Wang <wa...@gmail.com>.
Hamid,

Could you paste your code using KStreamDriver that does not have this issue
into the JIRA as well? I suspect KStreamDriver should have the same issue
and wondering why it did not.

Guozhang

On Tue, Nov 29, 2016 at 10:38 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks!
>
> On 11/29/16 7:18 AM, Hamidreza Afzali wrote:
> > I have created a JIRA issue:
> >
> > https://issues.apache.org/jira/browse/KAFKA-4461
> >
> >
> > Hamid
> >
>
>


-- 
-- Guozhang

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks!

On 11/29/16 7:18 AM, Hamidreza Afzali wrote:
> I have created a JIRA issue:
> 
> https://issues.apache.org/jira/browse/KAFKA-4461
> 
> 
> Hamid
> 


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks!


On Thu, Dec 1, 2016 at 5:17 AM, Hamidreza Afzali <
hamidreza.afzali@hivestreaming.com> wrote:

> I have added an example for KStreamDriver to the GitHub Gist and updated
> the JIRA issue.
>
> https://issues.apache.org/jira/browse/KAFKA-4461
>
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
>
>
> Hamid
>
>


-- 
-- Guozhang

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Hamidreza Afzali <ha...@hivestreaming.com>.
I have added an example for KStreamDriver to the GitHub Gist and updated the JIRA issue.

https://issues.apache.org/jira/browse/KAFKA-4461

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 


Hamid


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Hamidreza Afzali <ha...@hivestreaming.com>.
I have created a JIRA issue:

https://issues.apache.org/jira/browse/KAFKA-4461


Hamid


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hamid,

would you mind creating a Jira? Thanks.


-Matthias

On 11/28/16 9:36 AM, Guozhang Wang wrote:
> Damian, Hamid:
> 
> I looked at the source code and suspect that it is because of the
> auto-repartitioning which causes the topology to not directly forward the
> record to the child processors but send to an intermediate topic. In our
> tests we only do "groupByKey" without map, and hence auto-repartitioning
> will not be introduced.
> 
> What bothers me is that, for KStreamTestDriver it should have the similar
> issue as well (it does not handle intermediate topic either), but Hamid
> reports it actually works fine.
> 
> Anyways, I think there is an issue with at least ProcessorTopologyTestDriver,
> and very likely with KStreamTestDriver as well, and we should file a JIRA
> and continue investigating and fixing it.
> 
> 
> Guozhang
> 
> 
> On Thu, Nov 24, 2016 at 7:34 AM, Hamidreza Afzali <
> hamidreza.afzali@hivestreaming.com> wrote:
> 
>> Hi Damian,
>>
>> It processes correctly when using KStreamTestDriver.
>>
>> Best,
>> Hamid
>>
>>
> 
> 


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Guozhang Wang <wa...@gmail.com>.
Damian, Hamid:

I looked at the source code and suspect that it is because of the
auto-repartitioning which causes the topology to not directly forward the
record to the child processors but send to an intermediate topic. In our
tests we only do "groupByKey" without map, and hence auto-repartitioning
will not be introduced.

What bothers me is that, for KStreamTestDriver it should have the similar
issue as well (it does not handle intermediate topic either), but Hamid
reports it actually works fine.

Anyways, I think there is an issue with at least ProcessorTopologyTestDriver,
and very likely with KStreamTestDriver as well, and we should file a JIRA
and continue investigating and fixing it.


Guozhang


On Thu, Nov 24, 2016 at 7:34 AM, Hamidreza Afzali <
hamidreza.afzali@hivestreaming.com> wrote:

> Hi Damian,
>
> It processes correctly when using KStreamTestDriver.
>
> Best,
> Hamid
>
>


-- 
-- Guozhang

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Hamidreza Afzali <ha...@hivestreaming.com>.
Hi Damian,

It processes correctly when using KStreamTestDriver.

Best,
Hamid


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Hamidreza Afzali <ha...@hivestreaming.com>.
The map() returns non-null keys and values and produces the following stream:

[KSTREAM-MAP-0000000001]: A , 1
[KSTREAM-MAP-0000000001]: A , 2
[KSTREAM-MAP-0000000001]: B , 3

The issue arises when the combination of map() and groupByKey().count() is used with ProcessorTopologyTestDriver.

I have tried the topology on a local Kafka and got the expected result:

input: <"A-1", 1>, <"A-2", 2>, <"B-1", 3>
result: <"A":2>, <"B":1>.


Thanks,
Hamid


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by "Matthias J. Sax" <ma...@confluent.io>.
CACHE_MAX_BYTES_BUFFERING_CONFIG does not have any impact if you query
the state. If you query it, you will always get the latest values.
CACHE_MAX_BYTES_BUFFERING_CONFIG only effects the downstream KTable
changelog stream (but you do not use this anyway).

If I understand you correctly, if you remove the map() you get three
count results <"A-1":1>, <"A-2":1>, and <"B-1":1>

Are you sure, that you map is correct? I am not familiar with Scala, but
the key and value must both not be null to be included in the count.
Could it be that `k.split("-")(0)` returns a null key?

-Matthias

On 11/23/16 7:00 AM, Hamidreza Afzali wrote:
> Thanks Matthias.
> 
> Disabling the cache didn't solve the issue. Here's a sample code:
> 
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
> 
> The topology doesn't produce any result but it works when commenting out .map(...) in line 21.
> 
> Thanks,
> Hamid
> 


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by Hamidreza Afzali <ha...@hivestreaming.com>.
Thanks Matthias.

Disabling the cache didn't solve the issue. Here's a sample code:

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

The topology doesn't produce any result but it works when commenting out .map(...) in line 21.

Thanks,
Hamid


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

Posted by "Matthias J. Sax" <ma...@confluent.io>.
In Kafka 0.10.1 a deduplication cache was introduced for aggregates,
that reduces the downstream load for a KTable changelog stream.

If you want to disable the cache for testing, you can set
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to zero.

Compare:
http://docs.confluent.io/current/streams/developer-guide.html#memory-management


-Matthias

On 11/22/16 12:37 PM, Hamidreza Afzali wrote:
> Hi,
> 
> When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the combination of .map(...) and .groupByKey(...).count(...) does not produce any result.
> 
> The topology looks like this:
> 
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>  .map((k, v) => new KeyValue(fn(k), v))
>  .groupByKey(Serdes.String, Serdes.Integer)
>  .count(stateStore)
> 
> It works if we remove .map(...) or .groupByKey(...).count(...).
> 
> Is this a bug?
> 
> Thanks in advance,
> Hamid
>