You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Druhin Sagar Goel <dr...@arrcus.com> on 2018/08/20 19:50:33 UTC

Issue in Kafka 2.0.0 ?

Hi,

I’m using the org.kafka.streams.scala that was released with version 2.0.0. I’m getting a StackOverflowError as follows:

java.lang.StackOverflowError
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
                                                   .
   .
   .
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)

The Scala version I’m using is 2.11.11 and the code leading to the error is as follows (particularly the .filter).

val builder = new StreamsBuilder

val stream = builder.stream[Array[Byte], CaseClassA](args.topic)

val customers = args.config.keys

val predicates = customers.map { customerId =>
  (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
}.toSeq

val customerIdToStream = customers.zip(stream(predicates: _*)).toMap

val y = Printed.toSysOut[Windowed[Key], Long]

customerIdToStream.foreach { case (customerId, customerStream) =>
  val customerConfig = args.config(customerId)
  customerStream
    .flatMap { case (_, message) =>
      message.objects.map {
        case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
      }
    }
    .groupByKey
    .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
    .count()
    .filter { case (_, count) => count >= customerConfig.frequencyThreshold }
    .toStream
    .print(y)
}

Is this a bug with the new scala module related to: https://github.com/lightbend/kafka-streams-scala/issues/63 ?
Or am I doing something wrong?

Thanks,
Druhin

Re: Issue in Kafka 2.0.0 ?

Posted by Ted Yu <yu...@gmail.com>.
Sent out a PR #5543 which fixes the reported bug,
with StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegion
modified adding the filter methods.

FYI

On Mon, Aug 20, 2018 at 5:26 PM Ted Yu <yu...@gmail.com> wrote:

> Thanks for pointing me to that PR.
>
> I applied the PR locally but still got:
>
> org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
> > testShouldCountClicksPerRegion FAILED
>     java.lang.StackOverflowError
>
> I can go over that PR to see what can be referenced for solving this bug.
>
> FYI
>
> On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Is this related to the fix https://github.com/apache/kafka/pull/5502 that
>> is currently being worked on?
>>
>>
>> Guozhang
>>
>> On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>> > Thanks for reporting and for creating the ticket!
>> >
>> > -Matthias
>> >
>> > On 8/20/18 5:17 PM, Ted Yu wrote:
>> > > I was able to reproduce what you saw with modification
>> > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>> > > I have logged KAFKA-7316 and am looking for a fix.
>> > >
>> > > FYI
>> > >
>> > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dr...@arrcus.com>
>> > wrote:
>> > >
>> > >> Isn’t that a bug then? Or can I fix my code somehow?
>> > >>
>> > >>
>> > >>
>> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com
>> <mailto:
>> > >> yuzhihong@gmail.com>) wrote:
>> > >>
>> > >> I think what happened in your use case was that the following
>> implicit
>> > >> from ImplicitConversions.scala kept wrapping the resultant KTable
>> from
>> > >> filter():
>> > >>
>> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>> > >>
>> > >> leading to stack overflow.
>> > >>
>> > >> Cheers
>> > >>
>> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <
>> druhin@arrcus.com>
>> > >> wrote:
>> > >>
>> > >>> Hi,
>> > >>>
>> > >>> I’m using the org.kafka.streams.scala that was released with version
>> > >>> 2.0.0. I’m getting a StackOverflowError as follows:
>> > >>>
>> > >>> java.lang.StackOverflowError
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> .
>> > >>> .
>> > >>> .
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>>
>> > >>> The Scala version I’m using is 2.11.11 and the code leading to the
>> > error
>> > >>> is as follows (particularly the .filter).
>> > >>>
>> > >>> val builder = new StreamsBuilder
>> > >>>
>> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>> > >>>
>> > >>> val customers = args.config.keys
>> > >>>
>> > >>> val predicates = customers.map { customerId =>
>> > >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
>> > customerId
>> > >>> }.toSeq
>> > >>>
>> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>> > >>>
>> > >>> val y = Printed.toSysOut[Windowed[Key], Long]
>> > >>>
>> > >>> customerIdToStream.foreach { case (customerId, customerStream) =>
>> > >>> val customerConfig = args.config(customerId)
>> > >>> customerStream
>> > >>> .flatMap { case (_, message) =>
>> > >>> message.objects.map {
>> > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>> > >>> }
>> > >>> }
>> > >>> .groupByKey
>> > >>>
>> > >>>
>> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize).
>> > advanceBy(customerConfig.sliderSize))
>> > >>> .count()
>> > >>> .filter { case (_, count) => count >=
>> > >>> customerConfig.frequencyThreshold }
>> > >>> .toStream
>> > >>> .print(y)
>> > >>> }
>> > >>>
>> > >>> Is this a bug with the new scala module related to:
>> > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>> > >>> Or am I doing something wrong?
>> > >>>
>> > >>> Thanks,
>> > >>> Druhin
>> > >>>
>> > >>
>> > >
>> >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Re: Issue in Kafka 2.0.0 ?

Posted by Ted Yu <yu...@gmail.com>.
Thanks for pointing me to that PR.

I applied the PR locally but still got:

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
> testShouldCountClicksPerRegion FAILED
    java.lang.StackOverflowError

I can go over that PR to see what can be referenced for solving this bug.

FYI

On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang <wa...@gmail.com> wrote:

> Is this related to the fix https://github.com/apache/kafka/pull/5502 that
> is currently being worked on?
>
>
> Guozhang
>
> On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Thanks for reporting and for creating the ticket!
> >
> > -Matthias
> >
> > On 8/20/18 5:17 PM, Ted Yu wrote:
> > > I was able to reproduce what you saw with modification
> > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> > > I have logged KAFKA-7316 and am looking for a fix.
> > >
> > > FYI
> > >
> > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dr...@arrcus.com>
> > wrote:
> > >
> > >> Isn’t that a bug then? Or can I fix my code somehow?
> > >>
> > >>
> > >>
> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com<mailto:
> > >> yuzhihong@gmail.com>) wrote:
> > >>
> > >> I think what happened in your use case was that the following implicit
> > >> from ImplicitConversions.scala kept wrapping the resultant KTable from
> > >> filter():
> > >>
> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
> > >>
> > >> leading to stack overflow.
> > >>
> > >> Cheers
> > >>
> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <druhin@arrcus.com
> >
> > >> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I’m using the org.kafka.streams.scala that was released with version
> > >>> 2.0.0. I’m getting a StackOverflowError as follows:
> > >>>
> > >>> java.lang.StackOverflowError
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> .
> > >>> .
> > >>> .
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>>
> > >>> The Scala version I’m using is 2.11.11 and the code leading to the
> > error
> > >>> is as follows (particularly the .filter).
> > >>>
> > >>> val builder = new StreamsBuilder
> > >>>
> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> > >>>
> > >>> val customers = args.config.keys
> > >>>
> > >>> val predicates = customers.map { customerId =>
> > >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
> > customerId
> > >>> }.toSeq
> > >>>
> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> > >>>
> > >>> val y = Printed.toSysOut[Windowed[Key], Long]
> > >>>
> > >>> customerIdToStream.foreach { case (customerId, customerStream) =>
> > >>> val customerConfig = args.config(customerId)
> > >>> customerStream
> > >>> .flatMap { case (_, message) =>
> > >>> message.objects.map {
> > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> > >>> }
> > >>> }
> > >>> .groupByKey
> > >>>
> > >>>
> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize).
> > advanceBy(customerConfig.sliderSize))
> > >>> .count()
> > >>> .filter { case (_, count) => count >=
> > >>> customerConfig.frequencyThreshold }
> > >>> .toStream
> > >>> .print(y)
> > >>> }
> > >>>
> > >>> Is this a bug with the new scala module related to:
> > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> > >>> Or am I doing something wrong?
> > >>>
> > >>> Thanks,
> > >>> Druhin
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Re: Issue in Kafka 2.0.0 ?

Posted by Guozhang Wang <wa...@gmail.com>.
Is this related to the fix https://github.com/apache/kafka/pull/5502 that
is currently being worked on?


Guozhang

On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for reporting and for creating the ticket!
>
> -Matthias
>
> On 8/20/18 5:17 PM, Ted Yu wrote:
> > I was able to reproduce what you saw with modification
> > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> > I have logged KAFKA-7316 and am looking for a fix.
> >
> > FYI
> >
> > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dr...@arrcus.com>
> wrote:
> >
> >> Isn’t that a bug then? Or can I fix my code somehow?
> >>
> >>
> >>
> >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com<mailto:
> >> yuzhihong@gmail.com>) wrote:
> >>
> >> I think what happened in your use case was that the following implicit
> >> from ImplicitConversions.scala kept wrapping the resultant KTable from
> >> filter():
> >>
> >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
> >>
> >> leading to stack overflow.
> >>
> >> Cheers
> >>
> >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dr...@arrcus.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I’m using the org.kafka.streams.scala that was released with version
> >>> 2.0.0. I’m getting a StackOverflowError as follows:
> >>>
> >>> java.lang.StackOverflowError
> >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> KTable.scala:49)
> >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> KTable.scala:49)
> >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> KTable.scala:49)
> >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> KTable.scala:49)
> >>> .
> >>> .
> >>> .
> >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> KTable.scala:49)
> >>>
> >>> The Scala version I’m using is 2.11.11 and the code leading to the
> error
> >>> is as follows (particularly the .filter).
> >>>
> >>> val builder = new StreamsBuilder
> >>>
> >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> >>>
> >>> val customers = args.config.keys
> >>>
> >>> val predicates = customers.map { customerId =>
> >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
> customerId
> >>> }.toSeq
> >>>
> >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> >>>
> >>> val y = Printed.toSysOut[Windowed[Key], Long]
> >>>
> >>> customerIdToStream.foreach { case (customerId, customerStream) =>
> >>> val customerConfig = args.config(customerId)
> >>> customerStream
> >>> .flatMap { case (_, message) =>
> >>> message.objects.map {
> >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> >>> }
> >>> }
> >>> .groupByKey
> >>>
> >>>
> >> .windowedBy(TimeWindows.of(customerConfig.windowSize).
> advanceBy(customerConfig.sliderSize))
> >>> .count()
> >>> .filter { case (_, count) => count >=
> >>> customerConfig.frequencyThreshold }
> >>> .toStream
> >>> .print(y)
> >>> }
> >>>
> >>> Is this a bug with the new scala module related to:
> >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> >>> Or am I doing something wrong?
> >>>
> >>> Thanks,
> >>> Druhin
> >>>
> >>
> >
>
>


-- 
-- Guozhang

Re: Issue in Kafka 2.0.0 ?

Posted by Ted Yu <yu...@gmail.com>.
Hi,
I am aware that more than one method from KTable.scala have this issue.

Once I find a solution, I will apply the fix to the methods you listed.

Cheers

On Mon, Aug 20, 2018 at 5:23 PM Druhin Sagar Goel <dr...@arrcus.com> wrote:

> Thanks a lot Ted!
>
> FYI - The issue is not limited to the
> org.apache.kafka.streams.scala.KTable.filter. It also happens with
> org.apache.kafka.streams.scala.KTable.filterNot,
> org.apache.kafka.streams.scala.KStream.foreach and
> org.apache.kafka.streams.scala.KStream.peek.
>
> - Druhin
>
>
> On August 20, 2018 at 5:19:36 PM, Matthias J. Sax (matthias@confluent.io
> <ma...@confluent.io>) wrote:
>
> Thanks for reporting and for creating the ticket!
>
> -Matthias
>
> On 8/20/18 5:17 PM, Ted Yu wrote:
> > I was able to reproduce what you saw with modification
> > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> > I have logged KAFKA-7316 and am looking for a fix.
> >
> > FYI
> >
> > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dr...@arrcus.com>
> wrote:
> >
> >> Isn’t that a bug then? Or can I fix my code somehow?
> >>
> >>
> >>
> >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com<mailto:
> >> yuzhihong@gmail.com>) wrote:
> >>
> >> I think what happened in your use case was that the following implicit
> >> from ImplicitConversions.scala kept wrapping the resultant KTable from
> >> filter():
> >>
> >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
> >>
> >> leading to stack overflow.
> >>
> >> Cheers
> >>
> >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dr...@arrcus.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I’m using the org.kafka.streams.scala that was released with version
> >>> 2.0.0. I’m getting a StackOverflowError as follows:
> >>>
> >>> java.lang.StackOverflowError
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> .
> >>> .
> >>> .
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>>
> >>> The Scala version I’m using is 2.11.11 and the code leading to the
> error
> >>> is as follows (particularly the .filter).
> >>>
> >>> val builder = new StreamsBuilder
> >>>
> >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> >>>
> >>> val customers = args.config.keys
> >>>
> >>> val predicates = customers.map { customerId =>
> >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
> customerId
> >>> }.toSeq
> >>>
> >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> >>>
> >>> val y = Printed.toSysOut[Windowed[Key], Long]
> >>>
> >>> customerIdToStream.foreach { case (customerId, customerStream) =>
> >>> val customerConfig = args.config(customerId)
> >>> customerStream
> >>> .flatMap { case (_, message) =>
> >>> message.objects.map {
> >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> >>> }
> >>> }
> >>> .groupByKey
> >>>
> >>>
> >>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> >>> .count()
> >>> .filter { case (_, count) => count >=
> >>> customerConfig.frequencyThreshold }
> >>> .toStream
> >>> .print(y)
> >>> }
> >>>
> >>> Is this a bug with the new scala module related to:
> >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> >>> Or am I doing something wrong?
> >>>
> >>> Thanks,
> >>> Druhin
> >>>
> >>
> >
>
>

Re: Issue in Kafka 2.0.0 ?

Posted by Druhin Sagar Goel <dr...@arrcus.com>.
Thanks a lot Ted!

FYI - The issue is not limited to the org.apache.kafka.streams.scala.KTable.filter. It also happens with org.apache.kafka.streams.scala.KTable.filterNot, org.apache.kafka.streams.scala.KStream.foreach and org.apache.kafka.streams.scala.KStream.peek.

- Druhin


On August 20, 2018 at 5:19:36 PM, Matthias J. Sax (matthias@confluent.io<ma...@confluent.io>) wrote:

Thanks for reporting and for creating the ticket!

-Matthias

On 8/20/18 5:17 PM, Ted Yu wrote:
> I was able to reproduce what you saw with modification
> to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> I have logged KAFKA-7316 and am looking for a fix.
>
> FYI
>
> On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dr...@arrcus.com> wrote:
>
>> Isn’t that a bug then? Or can I fix my code somehow?
>>
>>
>>
>> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com<mailto:
>> yuzhihong@gmail.com>) wrote:
>>
>> I think what happened in your use case was that the following implicit
>> from ImplicitConversions.scala kept wrapping the resultant KTable from
>> filter():
>>
>> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>>
>> leading to stack overflow.
>>
>> Cheers
>>
>> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dr...@arrcus.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I’m using the org.kafka.streams.scala that was released with version
>>> 2.0.0. I’m getting a StackOverflowError as follows:
>>>
>>> java.lang.StackOverflowError
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> .
>>> .
>>> .
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>>
>>> The Scala version I’m using is 2.11.11 and the code leading to the error
>>> is as follows (particularly the .filter).
>>>
>>> val builder = new StreamsBuilder
>>>
>>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>>>
>>> val customers = args.config.keys
>>>
>>> val predicates = customers.map { customerId =>
>>> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
>>> }.toSeq
>>>
>>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>>>
>>> val y = Printed.toSysOut[Windowed[Key], Long]
>>>
>>> customerIdToStream.foreach { case (customerId, customerStream) =>
>>> val customerConfig = args.config(customerId)
>>> customerStream
>>> .flatMap { case (_, message) =>
>>> message.objects.map {
>>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>>> }
>>> }
>>> .groupByKey
>>>
>>>
>> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
>>> .count()
>>> .filter { case (_, count) => count >=
>>> customerConfig.frequencyThreshold }
>>> .toStream
>>> .print(y)
>>> }
>>>
>>> Is this a bug with the new scala module related to:
>>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>>> Or am I doing something wrong?
>>>
>>> Thanks,
>>> Druhin
>>>
>>
>


Re: Issue in Kafka 2.0.0 ?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for reporting and for creating the ticket!

-Matthias

On 8/20/18 5:17 PM, Ted Yu wrote:
> I was able to reproduce what you saw with modification
> to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> I have logged KAFKA-7316 and am looking for a fix.
> 
> FYI
> 
> On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dr...@arrcus.com> wrote:
> 
>> Isn’t that a bug then? Or can I fix my code somehow?
>>
>>
>>
>> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com<mailto:
>> yuzhihong@gmail.com>) wrote:
>>
>> I think what happened in your use case was that the following implicit
>> from ImplicitConversions.scala kept wrapping the resultant KTable from
>> filter():
>>
>> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>>
>> leading to stack overflow.
>>
>> Cheers
>>
>> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dr...@arrcus.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I’m using the org.kafka.streams.scala that was released with version
>>> 2.0.0. I’m getting a StackOverflowError as follows:
>>>
>>> java.lang.StackOverflowError
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>> .
>>> .
>>> .
>>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>>>
>>> The Scala version I’m using is 2.11.11 and the code leading to the error
>>> is as follows (particularly the .filter).
>>>
>>> val builder = new StreamsBuilder
>>>
>>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>>>
>>> val customers = args.config.keys
>>>
>>> val predicates = customers.map { customerId =>
>>> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
>>> }.toSeq
>>>
>>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>>>
>>> val y = Printed.toSysOut[Windowed[Key], Long]
>>>
>>> customerIdToStream.foreach { case (customerId, customerStream) =>
>>> val customerConfig = args.config(customerId)
>>> customerStream
>>> .flatMap { case (_, message) =>
>>> message.objects.map {
>>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>>> }
>>> }
>>> .groupByKey
>>>
>>>
>> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
>>> .count()
>>> .filter { case (_, count) => count >=
>>> customerConfig.frequencyThreshold }
>>> .toStream
>>> .print(y)
>>> }
>>>
>>> Is this a bug with the new scala module related to:
>>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>>> Or am I doing something wrong?
>>>
>>> Thanks,
>>> Druhin
>>>
>>
> 


Re: Issue in Kafka 2.0.0 ?

Posted by Ted Yu <yu...@gmail.com>.
I was able to reproduce what you saw with modification
to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
I have logged KAFKA-7316 and am looking for a fix.

FYI

On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dr...@arrcus.com> wrote:

> Isn’t that a bug then? Or can I fix my code somehow?
>
>
>
> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com<mailto:
> yuzhihong@gmail.com>) wrote:
>
> I think what happened in your use case was that the following implicit
> from ImplicitConversions.scala kept wrapping the resultant KTable from
> filter():
>
> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>
> leading to stack overflow.
>
> Cheers
>
> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dr...@arrcus.com>
> wrote:
>
> > Hi,
> >
> > I’m using the org.kafka.streams.scala that was released with version
> > 2.0.0. I’m getting a StackOverflowError as follows:
> >
> > java.lang.StackOverflowError
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> > .
> > .
> > .
> > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >
> > The Scala version I’m using is 2.11.11 and the code leading to the error
> > is as follows (particularly the .filter).
> >
> > val builder = new StreamsBuilder
> >
> > val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> >
> > val customers = args.config.keys
> >
> > val predicates = customers.map { customerId =>
> > (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> > }.toSeq
> >
> > val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> >
> > val y = Printed.toSysOut[Windowed[Key], Long]
> >
> > customerIdToStream.foreach { case (customerId, customerStream) =>
> > val customerConfig = args.config(customerId)
> > customerStream
> > .flatMap { case (_, message) =>
> > message.objects.map {
> > case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> > }
> > }
> > .groupByKey
> >
> >
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> > .count()
> > .filter { case (_, count) => count >=
> > customerConfig.frequencyThreshold }
> > .toStream
> > .print(y)
> > }
> >
> > Is this a bug with the new scala module related to:
> > https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> > Or am I doing something wrong?
> >
> > Thanks,
> > Druhin
> >
>

Re: Issue in Kafka 2.0.0 ?

Posted by Druhin Sagar Goel <dr...@arrcus.com>.
Isn’t that a bug then? Or can I fix my code somehow?



On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhihong@gmail.com<ma...@gmail.com>) wrote:

I think what happened in your use case was that the following implicit
from ImplicitConversions.scala kept wrapping the resultant KTable from
filter():

implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =

leading to stack overflow.

Cheers

On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dr...@arrcus.com>
wrote:

> Hi,
>
> I’m using the org.kafka.streams.scala that was released with version
> 2.0.0. I’m getting a StackOverflowError as follows:
>
> java.lang.StackOverflowError
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> .
> .
> .
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>
> The Scala version I’m using is 2.11.11 and the code leading to the error
> is as follows (particularly the .filter).
>
> val builder = new StreamsBuilder
>
> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>
> val customers = args.config.keys
>
> val predicates = customers.map { customerId =>
> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> }.toSeq
>
> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>
> val y = Printed.toSysOut[Windowed[Key], Long]
>
> customerIdToStream.foreach { case (customerId, customerStream) =>
> val customerConfig = args.config(customerId)
> customerStream
> .flatMap { case (_, message) =>
> message.objects.map {
> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> }
> }
> .groupByKey
>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> .count()
> .filter { case (_, count) => count >=
> customerConfig.frequencyThreshold }
> .toStream
> .print(y)
> }
>
> Is this a bug with the new scala module related to:
> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> Or am I doing something wrong?
>
> Thanks,
> Druhin
>

Re: Issue in Kafka 2.0.0 ?

Posted by Ted Yu <yu...@gmail.com>.
I think what happened in your use case was that the following implicit
from ImplicitConversions.scala kept wrapping the resultant KTable from
filter():

  implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =

leading to stack overflow.

Cheers

On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dr...@arrcus.com>
wrote:

> Hi,
>
> I’m using the org.kafka.streams.scala that was released with version
> 2.0.0. I’m getting a StackOverflowError as follows:
>
> java.lang.StackOverflowError
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>                                                    .
>    .
>    .
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
>
> The Scala version I’m using is 2.11.11 and the code leading to the error
> is as follows (particularly the .filter).
>
> val builder = new StreamsBuilder
>
> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>
> val customers = args.config.keys
>
> val predicates = customers.map { customerId =>
>   (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> }.toSeq
>
> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>
> val y = Printed.toSysOut[Windowed[Key], Long]
>
> customerIdToStream.foreach { case (customerId, customerStream) =>
>   val customerConfig = args.config(customerId)
>   customerStream
>     .flatMap { case (_, message) =>
>       message.objects.map {
>         case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>       }
>     }
>     .groupByKey
>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
>     .count()
>     .filter { case (_, count) => count >=
> customerConfig.frequencyThreshold }
>     .toStream
>     .print(y)
> }
>
> Is this a bug with the new scala module related to:
> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> Or am I doing something wrong?
>
> Thanks,
> Druhin
>