You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Debasish Ghosh <gh...@gmail.com> on 2017/11/15 08:36:35 UTC

Problem with KGroupedStream.count in 1.0.0

Hello -

In my Kafka Streams 0.11 application I have the following transformation ..

    val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
=> record.host)

    // we are changing the key here so that we can do a groupByKey later
    val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
KeyValue(value, value))

    // keys have changed - hence need new serdes
    val groupedStream: KGroupedStream[String, String] =
hostPairs.groupByKey(stringSerde,
stringSerde)

    val counts: KTable[String, java.lang.Long] =
groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)

Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
and the one that is introduced takes only KeyValueStore of Array[Byte] ..

KTable<K,java.lang.Long>
> count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>
> materialized)
> Count the number of records in this stream by the grouped key.


I **cannot** do the following since I have String in my KeyValueStore ..

    val counts: KTable[String, java.lang.Long] =
      groupedStream.count(Materialized.as[String, java.lang.Long,
KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))

Any suggestions as to how I can move the above code to 1.0.0 ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with KGroupedStream.count in 1.0.0

Posted by Debasish Ghosh <gh...@gmail.com>.
Thanks .. It works ..

On Wed, Nov 15, 2017 at 4:44 PM, Damian Guy <da...@gmail.com> wrote:

> Yes, right, that would be because the default serializer is set to bytes.
> Sorry i should have spotted that. Your Materialized should look something
> like:
>
> Materialized.as[String, java.lang.Long, KeyValueStore[Bytes,
> Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)
>    .withKeySerde(Serdes.String())
>
> Thanks,
> Damian
>
>
> On Wed, 15 Nov 2017 at 10:51 Debasish Ghosh <gh...@gmail.com>
> wrote:
>
> > It's not working fine .. I get the following exception during runtime ..
> >
> > Exception in thread
> >> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-
> d51314802f64-StreamThread-1"
> >> java.lang.ClassCastException: java.lang.String cannot be cast to [B
> >> at
> >> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(
> ByteArraySerializer.java:21)
> >> at org.apache.kafka.streams.state.StateSerdes.rawKey(
> StateSerdes.java:168)
> >> at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.
> innerKey(MeteredKeyValueBytesStore.java:60)
> >> at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.
> innerKey(MeteredKeyValueBytesStore.java:57)
> >> at
> >> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(
> InnerMeteredKeyValueStore.java:184)
> >> at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(
> MeteredKeyValueBytesStore.java:116)
> >> at
> >> org.apache.kafka.streams.kstream.internals.KStreamAggregate$
> KStreamAggregateProcessor.process(KStreamAggregate.java:70)
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:46)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:208)
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:124)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:80)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> >> at
> >> org.apache.kafka.streams.processor.internals.AssignedTasks.process(
> AssignedTasks.java:403)
> >> at
> >> org.apache.kafka.streams.processor.internals.TaskManager.process(
> TaskManager.java:317)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> processAndMaybeCommit(StreamThread.java:942)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:822)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:774)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:744)
> >
> >
> > Only when I change the key of the first stream to Array[Byte], things
> > work ok .. like this ..
> >
> > val hosts: KStream[Array[Byte], Array[Byte]] =
> logRecords.mapValues(record
> > => record.host.getBytes("UTF-8"))
> >
> > regards.
> >
> > On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> That shouldn't be a problem, the inner most store is of type
> >> `KeyValueStore<Bytes, byte[]>`, however the outer store will be
> >> `KeyValueStore<String, Long>`.
> >> It should work fine.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <gh...@gmail.com>
> >> wrote:
> >>
> >>> Hello -
> >>>
> >>> In my Kafka Streams 0.11 application I have the following
> transformation
> >>> ..
> >>>
> >>>     val hosts: KStream[Array[Byte], String] =
> logRecords.mapValues(record
> >>> => record.host)
> >>>
> >>>     // we are changing the key here so that we can do a groupByKey
> later
> >>>     val hostPairs: KStream[String, String] = hosts.map ((_, value) =>
> new
> >>> KeyValue(value, value))
> >>>
> >>>     // keys have changed - hence need new serdes
> >>>     val groupedStream: KGroupedStream[String, String] =
> >>> hostPairs.groupByKey(stringSerde,
> >>> stringSerde)
> >>>
> >>>     val counts: KTable[String, java.lang.Long] =
> >>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
> >>>
> >>> Now in 1.0.0, this variant of count on KGroupedStream has been
> deprecated
> >>> and the one that is introduced takes only KeyValueStore of Array[Byte]
> ..
> >>>
> >>> KTable<K,java.lang.Long>
> >>> >
> >>> count(Materialized<K,java.lang.Long,KeyValueStore<org.
> apache.kafka.common.utils.Bytes,byte[]>>
> >>> > materialized)
> >>> > Count the number of records in this stream by the grouped key.
> >>>
> >>>
> >>> I **cannot** do the following since I have String in my KeyValueStore
> ..
> >>>
> >>>     val counts: KTable[String, java.lang.Long] =
> >>>       groupedStream.count(Materialized.as[String, java.lang.Long,
> >>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
> >>>
> >>> Any suggestions as to how I can move the above code to 1.0.0 ?
> >>>
> >>> regards.
> >>>
> >>> --
> >>> Debasish Ghosh
> >>> http://manning.com/ghosh2
> >>> http://manning.com/ghosh
> >>>
> >>> Twttr: @debasishg
> >>> Blog: http://debasishg.blogspot.com
> >>> Code: http://github.com/debasishg
> >>>
> >>
> >
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with KGroupedStream.count in 1.0.0

Posted by Damian Guy <da...@gmail.com>.
Yes, right, that would be because the default serializer is set to bytes.
Sorry i should have spotted that. Your Materialized should look something
like:

Materialized.as[String, java.lang.Long, KeyValueStore[Bytes,
Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)
   .withKeySerde(Serdes.String())

Thanks,
Damian


On Wed, 15 Nov 2017 at 10:51 Debasish Ghosh <gh...@gmail.com>
wrote:

> It's not working fine .. I get the following exception during runtime ..
>
> Exception in thread
>> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-d51314802f64-StreamThread-1"
>> java.lang.ClassCastException: java.lang.String cannot be cast to [B
>> at
>> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
>> at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:168)
>> at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:60)
>> at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:57)
>> at
>> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>> at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>> at
>> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:70)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>> at
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>> at
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>> at
>> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>> at
>> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>
>
> Only when I change the key of the first stream to Array[Byte], things
> work ok .. like this ..
>
> val hosts: KStream[Array[Byte], Array[Byte]] = logRecords.mapValues(record
> => record.host.getBytes("UTF-8"))
>
> regards.
>
> On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <da...@gmail.com> wrote:
>
>> Hi,
>>
>> That shouldn't be a problem, the inner most store is of type
>> `KeyValueStore<Bytes, byte[]>`, however the outer store will be
>> `KeyValueStore<String, Long>`.
>> It should work fine.
>>
>> Thanks,
>> Damian
>>
>> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <gh...@gmail.com>
>> wrote:
>>
>>> Hello -
>>>
>>> In my Kafka Streams 0.11 application I have the following transformation
>>> ..
>>>
>>>     val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
>>> => record.host)
>>>
>>>     // we are changing the key here so that we can do a groupByKey later
>>>     val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
>>> KeyValue(value, value))
>>>
>>>     // keys have changed - hence need new serdes
>>>     val groupedStream: KGroupedStream[String, String] =
>>> hostPairs.groupByKey(stringSerde,
>>> stringSerde)
>>>
>>>     val counts: KTable[String, java.lang.Long] =
>>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
>>>
>>> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
>>> and the one that is introduced takes only KeyValueStore of Array[Byte] ..
>>>
>>> KTable<K,java.lang.Long>
>>> >
>>> count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>
>>> > materialized)
>>> > Count the number of records in this stream by the grouped key.
>>>
>>>
>>> I **cannot** do the following since I have String in my KeyValueStore ..
>>>
>>>     val counts: KTable[String, java.lang.Long] =
>>>       groupedStream.count(Materialized.as[String, java.lang.Long,
>>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
>>>
>>> Any suggestions as to how I can move the above code to 1.0.0 ?
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Re: Problem with KGroupedStream.count in 1.0.0

Posted by Debasish Ghosh <gh...@gmail.com>.
It's not working fine .. I get the following exception during runtime ..

Exception in thread
> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-d51314802f64-StreamThread-1"
> java.lang.ClassCastException: java.lang.String cannot be cast to [B
> at
> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
> at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:168)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:60)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:57)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
> at
> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:70)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)


Only when I change the key of the first stream to Array[Byte], things work
ok .. like this ..

val hosts: KStream[Array[Byte], Array[Byte]] = logRecords.mapValues(record
=> record.host.getBytes("UTF-8"))

regards.

On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <da...@gmail.com> wrote:

> Hi,
>
> That shouldn't be a problem, the inner most store is of type
> `KeyValueStore<Bytes, byte[]>`, however the outer store will be
> `KeyValueStore<String, Long>`.
> It should work fine.
>
> Thanks,
> Damian
>
> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <gh...@gmail.com>
> wrote:
>
>> Hello -
>>
>> In my Kafka Streams 0.11 application I have the following transformation
>> ..
>>
>>     val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
>> => record.host)
>>
>>     // we are changing the key here so that we can do a groupByKey later
>>     val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
>> KeyValue(value, value))
>>
>>     // keys have changed - hence need new serdes
>>     val groupedStream: KGroupedStream[String, String] =
>> hostPairs.groupByKey(stringSerde,
>> stringSerde)
>>
>>     val counts: KTable[String, java.lang.Long] =
>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
>>
>> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
>> and the one that is introduced takes only KeyValueStore of Array[Byte] ..
>>
>> KTable<K,java.lang.Long>
>> > count(Materialized<K,java.lang.Long,KeyValueStore<org.
>> apache.kafka.common.utils.Bytes,byte[]>>
>> > materialized)
>> > Count the number of records in this stream by the grouped key.
>>
>>
>> I **cannot** do the following since I have String in my KeyValueStore ..
>>
>>     val counts: KTable[String, java.lang.Long] =
>>       groupedStream.count(Materialized.as[String, java.lang.Long,
>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
>>
>> Any suggestions as to how I can move the above code to 1.0.0 ?
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with KGroupedStream.count in 1.0.0

Posted by Damian Guy <da...@gmail.com>.
Hi,

That shouldn't be a problem, the inner most store is of type
`KeyValueStore<Bytes, byte[]>`, however the outer store will be
`KeyValueStore<String, Long>`.
It should work fine.

Thanks,
Damian

On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <gh...@gmail.com>
wrote:

> Hello -
>
> In my Kafka Streams 0.11 application I have the following transformation ..
>
>     val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
> => record.host)
>
>     // we are changing the key here so that we can do a groupByKey later
>     val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
> KeyValue(value, value))
>
>     // keys have changed - hence need new serdes
>     val groupedStream: KGroupedStream[String, String] =
> hostPairs.groupByKey(stringSerde,
> stringSerde)
>
>     val counts: KTable[String, java.lang.Long] =
> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
>
> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
> and the one that is introduced takes only KeyValueStore of Array[Byte] ..
>
> KTable<K,java.lang.Long>
> >
> count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>
> > materialized)
> > Count the number of records in this stream by the grouped key.
>
>
> I **cannot** do the following since I have String in my KeyValueStore ..
>
>     val counts: KTable[String, java.lang.Long] =
>       groupedStream.count(Materialized.as[String, java.lang.Long,
> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
>
> Any suggestions as to how I can move the above code to 1.0.0 ?
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>