You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shanthi Nellaiappan <sh...@gmail.com> on 2017/03/22 19:10:58 UTC

Error in running PageViewTypedDemo

I have started exploring kafka streaming API. I'm trying to  run
PageViewTypedDemo program as it is without any changes locally on a
desktop. Current kafka version is 0.10.1.0.

With the following inputs from 2 different console,

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
streams-pageview-input

{"user":"1", "page":"22", "timestamp":1435278171111}

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
streams-userprofile-input

{"region":"CA", "timestamp":1435278171139}

The error is

Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
topic=streams-userprofile-input, partition=0, offset=0

at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:200)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Caused by: org.apache.kafka.streams.errors.StreamsException: Record key for
the source KTable from store name streams-userprofile-store-name should not
be null.

at
org.apache.kafka.streams.kstream.internals.KTableSource$MaterializedKTableSourceProcessor.process(KTableSource.java:83)

at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)

at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)

at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)

... 2 more


Can someone help .Is there anything else to be done apart from creating the
2 topics streams-pageview-input & streams-userprofile-input

Re: Error in running PageViewTypedDemo

Posted by Shanthi Nellaiappan <sh...@gmail.com>.
Thanks Michael. I am trying that out. What would be best way to clear the
values entered for a given input topic. Due to typo, if incorrect values
are entered in a topic, it has to be cleared. Otherwise, the program keeps
failing at processing incorrect input value.

On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mi...@confluent.io> wrote:

> IIRC the PageViewTypedDemo example requires input data where the
> username/userId is captured in the keys of messages/records, and further
> information in the values of those messages.
>
> The problem you are running into is that, when you are writing your input
> data via the console consumer, the records you are generating only have
> values -- the keys are null because you don't specify any explicitly.
>
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-userprofile-input
> >
> > {"region":"CA", "timestamp":1435278171139}
>
> And you have the same issue for the other topic, "streams-pageview-input".
>
> To enter keys, you need to add some CLI options to the console producer.
>
> Example:
>
>     $ bin/kafka-console-producer --broker-list localhost:9092 \
>                              --topic streams-userprofile-input \
>                              --property parse.key=true \
>                              --property key.separator=,
>
>     firstUser,firstValue
>     secondUser,secondValue
>
> Hope this helps,
> Michael
>
>
>
>
> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <sh...@gmail.com>
> wrote:
>
> > I have started exploring kafka streaming API. I'm trying to  run
> > PageViewTypedDemo program as it is without any changes locally on a
> > desktop. Current kafka version is 0.10.1.0.
> >
> > With the following inputs from 2 different console,
> >
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> > streams-pageview-input
> >
> > {"user":"1", "page":"22", "timestamp":1435278171111}
> >
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> > streams-userprofile-input
> >
> > {"region":"CA", "timestamp":1435278171139}
> >
> > The error is
> >
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
> > topic=streams-userprofile-input, partition=0, offset=0
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:200)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:436)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> >
> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
> > for
> > the source KTable from store name streams-userprofile-store-name should
> not
> > be null.
> >
> > at
> > org.apache.kafka.streams.kstream.internals.KTableSource$
> > MaterializedKTableSourceProcessor.process(KTableSource.java:83)
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:204)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:181)
> >
> > ... 2 more
> >
> >
> > Can someone help .Is there anything else to be done apart from creating
> the
> > 2 topics streams-pageview-input & streams-userprofile-input
> >
>

Re: Error in running PageViewTypedDemo

Posted by Michael Noll <mi...@confluent.io>.
The quickest answer I can give you is trying a similar example [1], where
we provide a driver that generates the required input data for the page
view example.

-Michael



[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java
(this is for Confluent 3.2 and Apache Kafka 0.10.2.0)



On Thu, Mar 23, 2017 at 11:56 AM, Shanthi Nellaiappan <sh...@gmail.com>
wrote:

> Any example for the above would be appreciated. Thanks
>
> On Wed, Mar 22, 2017 at 2:50 PM, Shanthi Nellaiappan <sh...@gmail.com>
> wrote:
>
> > Thanks for the info.
> > With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as
> input
> > for streams-pageview-input   an "2",{"region":"CA","timestamp"
> :1435278177777}
> > as input for  streams-userprofile-input, the following error is shown,
> > Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> > JsonTimestampExtractor cannot recognize the record value
> > org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$
> > PageViewByRegion@4764b2e
> > at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.
> > extract(JsonTimestampExtractor.java:43)
> > at org.apache.kafka.streams.processor.internals.
> RecordQueue.addRawRecords(
> > RecordQueue.java:105)
> > at org.apache.kafka.streams.processor.internals.
> > PartitionGroup.addRawRecords(PartitionGroup.java:117)
> > at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> > StreamTask.java:144)
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:415)
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> >
> > Any example on the correct input value is really appreciated.
> >
> > Thanks
> >
> > On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> >> IIRC the PageViewTypedDemo example requires input data where the
> >> username/userId is captured in the keys of messages/records, and further
> >> information in the values of those messages.
> >>
> >> The problem you are running into is that, when you are writing your
> input
> >> data via the console consumer, the records you are generating only have
> >> values -- the keys are null because you don't specify any explicitly.
> >>
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> streams-userprofile-input
> >> >
> >> > {"region":"CA", "timestamp":1435278171139}
> >>
> >> And you have the same issue for the other topic,
> "streams-pageview-input".
> >>
> >> To enter keys, you need to add some CLI options to the console producer.
> >>
> >> Example:
> >>
> >>     $ bin/kafka-console-producer --broker-list localhost:9092 \
> >>                              --topic streams-userprofile-input \
> >>                              --property parse.key=true \
> >>                              --property key.separator=,
> >>
> >>     firstUser,firstValue
> >>     secondUser,secondValue
> >>
> >> Hope this helps,
> >> Michael
> >>
> >>
> >>
> >>
> >> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <
> shan2nell@gmail.com
> >> >
> >> wrote:
> >>
> >> > I have started exploring kafka streaming API. I'm trying to  run
> >> > PageViewTypedDemo program as it is without any changes locally on a
> >> > desktop. Current kafka version is 0.10.1.0.
> >> >
> >> > With the following inputs from 2 different console,
> >> >
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> > streams-pageview-input
> >> >
> >> > {"user":"1", "page":"22", "timestamp":1435278171111}
> >> >
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> > streams-userprofile-input
> >> >
> >> > {"region":"CA", "timestamp":1435278171139}
> >> >
> >> > The error is
> >> >
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> >> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
> >> > topic=streams-userprofile-input, partition=0, offset=0
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > StreamTask.process(StreamTask.java:200)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> > StreamThread.java:436)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > StreamThread.run(StreamThread.java:242)
> >> >
> >> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record
> key
> >> > for
> >> > the source KTable from store name streams-userprofile-store-name
> should
> >> not
> >> > be null.
> >> >
> >> > at
> >> > org.apache.kafka.streams.kstream.internals.KTableSource$
> >> > MaterializedKTableSourceProcessor.process(KTableSource.java:83)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >> > ProcessorNode.java:82)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.ProcessorContex
> >> tImpl.forward(
> >> > ProcessorContextImpl.java:204)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > SourceNode.process(SourceNode.java:66)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > StreamTask.process(StreamTask.java:181)
> >> >
> >> > ... 2 more
> >> >
> >> >
> >> > Can someone help .Is there anything else to be done apart from
> creating
> >> the
> >> > 2 topics streams-pageview-input & streams-userprofile-input
> >> >
> >>
> >
> >
>

Re: Error in running PageViewTypedDemo

Posted by Shanthi Nellaiappan <sh...@gmail.com>.
Any example for the above would be appreciated. Thanks

On Wed, Mar 22, 2017 at 2:50 PM, Shanthi Nellaiappan <sh...@gmail.com>
wrote:

> Thanks for the info.
> With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as input
> for streams-pageview-input   an "2",{"region":"CA","timestamp":1435278177777}
> as input for  streams-userprofile-input, the following error is shown,
> Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> JsonTimestampExtractor cannot recognize the record value
> org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$
> PageViewByRegion@4764b2e
> at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.
> extract(JsonTimestampExtractor.java:43)
> at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(
> RecordQueue.java:105)
> at org.apache.kafka.streams.processor.internals.
> PartitionGroup.addRawRecords(PartitionGroup.java:117)
> at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> StreamTask.java:144)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:415)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
>
> Any example on the correct input value is really appreciated.
>
> Thanks
>
> On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
>> IIRC the PageViewTypedDemo example requires input data where the
>> username/userId is captured in the keys of messages/records, and further
>> information in the values of those messages.
>>
>> The problem you are running into is that, when you are writing your input
>> data via the console consumer, the records you are generating only have
>> values -- the keys are null because you don't specify any explicitly.
>>
>> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> streams-userprofile-input
>> >
>> > {"region":"CA", "timestamp":1435278171139}
>>
>> And you have the same issue for the other topic, "streams-pageview-input".
>>
>> To enter keys, you need to add some CLI options to the console producer.
>>
>> Example:
>>
>>     $ bin/kafka-console-producer --broker-list localhost:9092 \
>>                              --topic streams-userprofile-input \
>>                              --property parse.key=true \
>>                              --property key.separator=,
>>
>>     firstUser,firstValue
>>     secondUser,secondValue
>>
>> Hope this helps,
>> Michael
>>
>>
>>
>>
>> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <shan2nell@gmail.com
>> >
>> wrote:
>>
>> > I have started exploring kafka streaming API. I'm trying to  run
>> > PageViewTypedDemo program as it is without any changes locally on a
>> > desktop. Current kafka version is 0.10.1.0.
>> >
>> > With the following inputs from 2 different console,
>> >
>> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> > streams-pageview-input
>> >
>> > {"user":"1", "page":"22", "timestamp":1435278171111}
>> >
>> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> > streams-userprofile-input
>> >
>> > {"region":"CA", "timestamp":1435278171139}
>> >
>> > The error is
>> >
>> > Exception in thread "StreamThread-1"
>> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
>> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
>> > topic=streams-userprofile-input, partition=0, offset=0
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > StreamTask.process(StreamTask.java:200)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > StreamThread.java:436)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > StreamThread.run(StreamThread.java:242)
>> >
>> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
>> > for
>> > the source KTable from store name streams-userprofile-store-name should
>> not
>> > be null.
>> >
>> > at
>> > org.apache.kafka.streams.kstream.internals.KTableSource$
>> > MaterializedKTableSourceProcessor.process(KTableSource.java:83)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorContex
>> tImpl.forward(
>> > ProcessorContextImpl.java:204)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > SourceNode.process(SourceNode.java:66)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > StreamTask.process(StreamTask.java:181)
>> >
>> > ... 2 more
>> >
>> >
>> > Can someone help .Is there anything else to be done apart from creating
>> the
>> > 2 topics streams-pageview-input & streams-userprofile-input
>> >
>>
>
>

Re: Error in running PageViewTypedDemo

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I guess, the console producer inserts the data as String -- and not as
"binary JSON". try so us a different serializer to insert data with
expected format for Streams.

-Matthias


On 3/22/17 2:50 PM, Shanthi Nellaiappan wrote:
> Thanks for the info.
> With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as input
> for streams-pageview-input   an
> "2",{"region":"CA","timestamp":1435278177777} as input for
>  streams-userprofile-input, the following error is shown,
> Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> JsonTimestampExtractor cannot recognize the record value
> org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$PageViewByRegion@4764b2e
> at
> org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.extract(JsonTimestampExtractor.java:43)
> at
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:105)
> at
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> 
> Any example on the correct input value is really appreciated.
> 
> Thanks
> 
> On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mi...@confluent.io> wrote:
> 
>> IIRC the PageViewTypedDemo example requires input data where the
>> username/userId is captured in the keys of messages/records, and further
>> information in the values of those messages.
>>
>> The problem you are running into is that, when you are writing your input
>> data via the console consumer, the records you are generating only have
>> values -- the keys are null because you don't specify any explicitly.
>>
>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> streams-userprofile-input
>>>
>>> {"region":"CA", "timestamp":1435278171139}
>>
>> And you have the same issue for the other topic, "streams-pageview-input".
>>
>> To enter keys, you need to add some CLI options to the console producer.
>>
>> Example:
>>
>>     $ bin/kafka-console-producer --broker-list localhost:9092 \
>>                              --topic streams-userprofile-input \
>>                              --property parse.key=true \
>>                              --property key.separator=,
>>
>>     firstUser,firstValue
>>     secondUser,secondValue
>>
>> Hope this helps,
>> Michael
>>
>>
>>
>>
>> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <sh...@gmail.com>
>> wrote:
>>
>>> I have started exploring kafka streaming API. I'm trying to  run
>>> PageViewTypedDemo program as it is without any changes locally on a
>>> desktop. Current kafka version is 0.10.1.0.
>>>
>>> With the following inputs from 2 different console,
>>>
>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> streams-pageview-input
>>>
>>> {"user":"1", "page":"22", "timestamp":1435278171111}
>>>
>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> streams-userprofile-input
>>>
>>> {"region":"CA", "timestamp":1435278171139}
>>>
>>> The error is
>>>
>>> Exception in thread "StreamThread-1"
>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
>>> process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
>>> topic=streams-userprofile-input, partition=0, offset=0
>>>
>>> at
>>> org.apache.kafka.streams.processor.internals.
>>> StreamTask.process(StreamTask.java:200)
>>>
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> StreamThread.java:436)
>>>
>>> at
>>> org.apache.kafka.streams.processor.internals.
>>> StreamThread.run(StreamThread.java:242)
>>>
>>> Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
>>> for
>>> the source KTable from store name streams-userprofile-store-name should
>> not
>>> be null.
>>>
>>> at
>>> org.apache.kafka.streams.kstream.internals.KTableSource$
>>> MaterializedKTableSourceProcessor.process(KTableSource.java:83)
>>>
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>> ProcessorNode.java:82)
>>>
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(
>>> ProcessorContextImpl.java:204)
>>>
>>> at
>>> org.apache.kafka.streams.processor.internals.
>>> SourceNode.process(SourceNode.java:66)
>>>
>>> at
>>> org.apache.kafka.streams.processor.internals.
>>> StreamTask.process(StreamTask.java:181)
>>>
>>> ... 2 more
>>>
>>>
>>> Can someone help .Is there anything else to be done apart from creating
>> the
>>> 2 topics streams-pageview-input & streams-userprofile-input
>>>
>>
> 


Re: Error in running PageViewTypedDemo

Posted by Shanthi Nellaiappan <sh...@gmail.com>.
Thanks for the info.
With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as input
for streams-pageview-input   an
"2",{"region":"CA","timestamp":1435278177777} as input for
 streams-userprofile-input, the following error is shown,
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
JsonTimestampExtractor cannot recognize the record value
org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$PageViewByRegion@4764b2e
at
org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.extract(JsonTimestampExtractor.java:43)
at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:105)
at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Any example on the correct input value is really appreciated.

Thanks

On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mi...@confluent.io> wrote:

> IIRC the PageViewTypedDemo example requires input data where the
> username/userId is captured in the keys of messages/records, and further
> information in the values of those messages.
>
> The problem you are running into is that, when you are writing your input
> data via the console consumer, the records you are generating only have
> values -- the keys are null because you don't specify any explicitly.
>
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-userprofile-input
> >
> > {"region":"CA", "timestamp":1435278171139}
>
> And you have the same issue for the other topic, "streams-pageview-input".
>
> To enter keys, you need to add some CLI options to the console producer.
>
> Example:
>
>     $ bin/kafka-console-producer --broker-list localhost:9092 \
>                              --topic streams-userprofile-input \
>                              --property parse.key=true \
>                              --property key.separator=,
>
>     firstUser,firstValue
>     secondUser,secondValue
>
> Hope this helps,
> Michael
>
>
>
>
> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <sh...@gmail.com>
> wrote:
>
> > I have started exploring kafka streaming API. I'm trying to  run
> > PageViewTypedDemo program as it is without any changes locally on a
> > desktop. Current kafka version is 0.10.1.0.
> >
> > With the following inputs from 2 different console,
> >
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> > streams-pageview-input
> >
> > {"user":"1", "page":"22", "timestamp":1435278171111}
> >
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> > streams-userprofile-input
> >
> > {"region":"CA", "timestamp":1435278171139}
> >
> > The error is
> >
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
> > topic=streams-userprofile-input, partition=0, offset=0
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:200)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:436)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> >
> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
> > for
> > the source KTable from store name streams-userprofile-store-name should
> not
> > be null.
> >
> > at
> > org.apache.kafka.streams.kstream.internals.KTableSource$
> > MaterializedKTableSourceProcessor.process(KTableSource.java:83)
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:204)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:181)
> >
> > ... 2 more
> >
> >
> > Can someone help .Is there anything else to be done apart from creating
> the
> > 2 topics streams-pageview-input & streams-userprofile-input
> >
>

Re: Error in running PageViewTypedDemo

Posted by Michael Noll <mi...@confluent.io>.
IIRC the PageViewTypedDemo example requires input data where the
username/userId is captured in the keys of messages/records, and further
information in the values of those messages.

The problem you are running into is that, when you are writing your input
data via the console consumer, the records you are generating only have
values -- the keys are null because you don't specify any explicitly.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
streams-userprofile-input
>
> {"region":"CA", "timestamp":1435278171139}

And you have the same issue for the other topic, "streams-pageview-input".

To enter keys, you need to add some CLI options to the console producer.

Example:

    $ bin/kafka-console-producer --broker-list localhost:9092 \
                             --topic streams-userprofile-input \
                             --property parse.key=true \
                             --property key.separator=,

    firstUser,firstValue
    secondUser,secondValue

Hope this helps,
Michael




On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <sh...@gmail.com>
wrote:

> I have started exploring kafka streaming API. I'm trying to  run
> PageViewTypedDemo program as it is without any changes locally on a
> desktop. Current kafka version is 0.10.1.0.
>
> With the following inputs from 2 different console,
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-pageview-input
>
> {"user":"1", "page":"22", "timestamp":1435278171111}
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-userprofile-input
>
> {"region":"CA", "timestamp":1435278171139}
>
> The error is
>
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
> topic=streams-userprofile-input, partition=0, offset=0
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:200)
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
>
> Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
> for
> the source KTable from store name streams-userprofile-store-name should not
> be null.
>
> at
> org.apache.kafka.streams.kstream.internals.KTableSource$
> MaterializedKTableSourceProcessor.process(KTableSource.java:83)
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:204)
>
> at
> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
>
> ... 2 more
>
>
> Can someone help .Is there anything else to be done apart from creating the
> 2 topics streams-pageview-input & streams-userprofile-input
>