You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Hamidreza Afzali <ha...@hivestreaming.com> on 2016/09/28 09:48:47 UTC

Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

Hi,

We are using the latest Kafka 0.10.1 branch. The combination of ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a division by 0 exception because of the empty list of partitions:

https://github.com/apache/kafka/blob/0.10.1/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java#L47

Our topology looks similar to this:

  builder.stream("events")
    .groupByKey(...)
    .aggregate(...,
      TimeWindows.of(1 * 60 * 1000L)
    )
    .mapValues(_.size: Integer)
    .to(windowedSerde, Serdes.Integer(), "events-over-time")

If we use our own partitioner in .to() it works.

  class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
    override def partition(k: K, v: V, numPartitions: Int): Integer = {
      // return an integer between 0 and numPartitions-1, or null if the default partitioning logic should be used
      null
    }
  }

Is this a bug?

Thank you in advance,
Hamid


Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

Posted by Guozhang Wang <wa...@gmail.com>.
I have pushed a hotfix to both trunk and 0.10.1, could you check if the
issue is resolved by now?

On Mon, Oct 3, 2016 at 7:18 AM, Hamidreza Afzali <
hamidreza.afzali@hivestreaming.com> wrote:

> Thanks Guozhang. We use ProcessorTopologyTestDriver for unit tests.
>
> Hamid
>
>
> > On 28 Sep 2016, at 11:48 AM, Hamidreza Afzali <hamidreza.afzali@
> hivestreaming.com> wrote:
> >
> > Hi,
> >
> > We are using the latest Kafka 0.10.1 branch. The combination of
> ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a
> division by 0 exception because of the empty list of partitions:
> >
> > https://github.com/apache/kafka/blob/0.10.1/streams/src/
> test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
> > https://github.com/apache/kafka/blob/0.10.1/streams/src/
> main/java/org/apache/kafka/streams/kstream/internals/
> WindowedStreamPartitioner.java#L47
> >
> > Our topology looks similar to this:
> >
> >  builder.stream("events")
> >    .groupByKey(...)
> >    .aggregate(...,
> >      TimeWindows.of(1 * 60 * 1000L)
> >    )
> >    .mapValues(_.size: Integer)
> >    .to(windowedSerde, Serdes.Integer(), "events-over-time")
> >
> > If we use our own partitioner in .to() it works.
> >
> >  class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
> >    override def partition(k: K, v: V, numPartitions: Int): Integer = {
> >      // return an integer between 0 and numPartitions-1, or null if the
> default partitioning logic should be used
> >      null
> >    }
> >  }
> >
> > Is this a bug?
> >
> > Thank you in advance,
> > Hamid
> >
>
>


-- 
-- Guozhang

Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

Posted by Hamidreza Afzali <ha...@hivestreaming.com>.
Thanks Guozhang. I can confirm the issue is resolved.

Hamid


Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

Posted by Hamidreza Afzali <ha...@hivestreaming.com>.
Thanks Guozhang. We use ProcessorTopologyTestDriver for unit tests.

Hamid


> On 28 Sep 2016, at 11:48 AM, Hamidreza Afzali <ha...@hivestreaming.com> wrote:
> 
> Hi,
> 
> We are using the latest Kafka 0.10.1 branch. The combination of ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a division by 0 exception because of the empty list of partitions:
> 
> https://github.com/apache/kafka/blob/0.10.1/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
> https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java#L47
> 
> Our topology looks similar to this:
> 
>  builder.stream("events")
>    .groupByKey(...)
>    .aggregate(...,
>      TimeWindows.of(1 * 60 * 1000L)
>    )
>    .mapValues(_.size: Integer)
>    .to(windowedSerde, Serdes.Integer(), "events-over-time")
> 
> If we use our own partitioner in .to() it works.
> 
>  class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
>    override def partition(k: K, v: V, numPartitions: Int): Integer = {
>      // return an integer between 0 and numPartitions-1, or null if the default partitioning logic should be used
>      null
>    }
>  }
> 
> Is this a bug?
> 
> Thank you in advance,
> Hamid
> 


Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for reporting this Hamid. The ProcessorTopologyTestDriver is used
only in ProcessorTopologyTest and is currently not expected to use
otherwise, and hence that is why we overwrite the MockProducer's
partitionsFor function to only return the empty list. Is there any
particular reason that you want to use ProcessorTopologyTestDriver in your
testing / staging environment? In general I'd like to recommend using the a
full-fledged Kafka Stream library for integration tests with synthetic
testing data.

That said, in RecordCollector we can at least capture `numPartitions` = 0
and throw a more informative error. Will file a patch for this improvement.


Guozhang


On Wed, Sep 28, 2016 at 2:48 AM, Hamidreza Afzali <
hamidreza.afzali@hivestreaming.com> wrote:

> Hi,
>
> We are using the latest Kafka 0.10.1 branch. The combination of
> ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a
> division by 0 exception because of the empty list of partitions:
>
> https://github.com/apache/kafka/blob/0.10.1/streams/src/
> test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
> https://github.com/apache/kafka/blob/0.10.1/streams/src/
> main/java/org/apache/kafka/streams/kstream/internals/
> WindowedStreamPartitioner.java#L47
>
> Our topology looks similar to this:
>
>   builder.stream("events")
>     .groupByKey(...)
>     .aggregate(...,
>       TimeWindows.of(1 * 60 * 1000L)
>     )
>     .mapValues(_.size: Integer)
>     .to(windowedSerde, Serdes.Integer(), "events-over-time")
>
> If we use our own partitioner in .to() it works.
>
>   class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
>     override def partition(k: K, v: V, numPartitions: Int): Integer = {
>       // return an integer between 0 and numPartitions-1, or null if the
> default partitioning logic should be used
>       null
>     }
>   }
>
> Is this a bug?
>
> Thank you in advance,
> Hamid
>
>


-- 
-- Guozhang