You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tim Renner <re...@gmail.com> on 2016/06/16 23:12:37 UTC

Kafka Streams KTable-KTable Join Error

Hi all,

I'm trying to do a KTable-KTable join to compute an average within a
tumbling window.
Here's the KStreams code (I've put a fully working example in a gist:
https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a)
KStreamBuilder builder = new KStreamBuilder();

KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");

KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 10000L),
Serdes.String());


KTable<Windowed<String>, Long> longSums =
longs.reduceByKey((v1, v2) -> v1 + v2,
 TimeWindows.of("longSums", 10000L),
 Serdes.String(),
 Serdes.Long());

KTable<Windowed<String>, Double> longAvgs =
longSums.join(longCounts,
 (sum, count) ->
  sum.doubleValue()/count.doubleValue());

longAvgs.toStream((wk, v) -> wk.key())
.to(Serdes.String(),
Serdes.Double(),
"long-avgs");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

When I run this, I get the following exception:

java.util.NoSuchElementException
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:95)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:64)
at
org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:136)
at
org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:117)
at
org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:166)
at
org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:147)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:77)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:136)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

Looks like the join is throwing the exception. Any ideas?

Thanks,
Tim

Re: Kafka Streams KTable-KTable Join Error

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for validating!

Guozhang

On Fri, Jun 17, 2016 at 7:39 PM, Tim Renner <re...@gmail.com>
wrote:

> Hi Guozhang,
>
> Apologies for the delay - had to convert to the new groupBy. The fix got
> rid of the error.
>
> Thanks!
>
> Tim
>



-- 
-- Guozhang

Re: Kafka Streams KTable-KTable Join Error

Posted by Tim Renner <re...@gmail.com>.
Hi Guozhang,

Apologies for the delay - had to convert to the new groupBy. The fix got
rid of the error.

Thanks!

Tim

Re: Kafka Streams KTable-KTable Join Error

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Tim,

I think I found the issue, could you apply the following patch and retry
your app?

https://github.com/apache/kafka/pull/1520

Guozhang


On Thu, Jun 16, 2016 at 11:11 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Tim,
>
> By looking through the source code I suspect it is a bug in Kafka Stream's KStreamWindowReduce
> implementation. I'll do further investigation tomorrow and possibly file a
> JIRA with a patch.
>
>
> Guozhang
>
>
> On Thu, Jun 16, 2016 at 4:12 PM, Tim Renner <re...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I'm trying to do a KTable-KTable join to compute an average within a
>> tumbling window.
>> Here's the KStreams code (I've put a fully working example in a gist:
>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a)
>> KStreamBuilder builder = new KStreamBuilder();
>>
>> KStream<String, Long> longs = builder.stream(
>> Serdes.String(), Serdes.Long(), "longs");
>>
>> KTable<Windowed<String>, Long> longCounts =
>> longs.countByKey(TimeWindows.of("longCounts", 10000L),
>> Serdes.String());
>>
>>
>> KTable<Windowed<String>, Long> longSums =
>> longs.reduceByKey((v1, v2) -> v1 + v2,
>>  TimeWindows.of("longSums", 10000L),
>>  Serdes.String(),
>>  Serdes.Long());
>>
>> KTable<Windowed<String>, Double> longAvgs =
>> longSums.join(longCounts,
>>  (sum, count) ->
>>   sum.doubleValue()/count.doubleValue());
>>
>> longAvgs.toStream((wk, v) -> wk.key())
>> .to(Serdes.String(),
>> Serdes.Double(),
>> "long-avgs");
>>
>> KafkaStreams streams = new KafkaStreams(builder, config);
>> streams.start();
>>
>> When I run this, I get the following exception:
>>
>> java.util.NoSuchElementException
>> at
>>
>> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:95)
>> at
>>
>> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:64)
>> at
>>
>> org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:136)
>> at
>>
>> org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:117)
>> at
>>
>> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:166)
>> at
>>
>> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:147)
>> at
>>
>> org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:77)
>> at
>>
>> org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48)
>> at
>>
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
>> at
>>
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> at
>>
>> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:136)
>> at
>>
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
>> at
>>
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> at
>>
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>>
>> Looks like the join is throwing the exception. Any ideas?
>>
>> Thanks,
>> Tim
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Kafka Streams KTable-KTable Join Error

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Tim,

By looking through the source code I suspect it is a bug in Kafka
Stream's KStreamWindowReduce
implementation. I'll do further investigation tomorrow and possibly file a
JIRA with a patch.


Guozhang


On Thu, Jun 16, 2016 at 4:12 PM, Tim Renner <re...@gmail.com>
wrote:

> Hi all,
>
> I'm trying to do a KTable-KTable join to compute an average within a
> tumbling window.
> Here's the KStreams code (I've put a fully working example in a gist:
> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a)
> KStreamBuilder builder = new KStreamBuilder();
>
> KStream<String, Long> longs = builder.stream(
> Serdes.String(), Serdes.Long(), "longs");
>
> KTable<Windowed<String>, Long> longCounts =
> longs.countByKey(TimeWindows.of("longCounts", 10000L),
> Serdes.String());
>
>
> KTable<Windowed<String>, Long> longSums =
> longs.reduceByKey((v1, v2) -> v1 + v2,
>  TimeWindows.of("longSums", 10000L),
>  Serdes.String(),
>  Serdes.Long());
>
> KTable<Windowed<String>, Double> longAvgs =
> longSums.join(longCounts,
>  (sum, count) ->
>   sum.doubleValue()/count.doubleValue());
>
> longAvgs.toStream((wk, v) -> wk.key())
> .to(Serdes.String(),
> Serdes.Double(),
> "long-avgs");
>
> KafkaStreams streams = new KafkaStreams(builder, config);
> streams.start();
>
> When I run this, I get the following exception:
>
> java.util.NoSuchElementException
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:95)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:64)
> at
>
> org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:136)
> at
>
> org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:117)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:166)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:147)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:77)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:136)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>
> Looks like the join is throwing the exception. Any ideas?
>
> Thanks,
> Tim
>



-- 
-- Guozhang