You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Shekar Tippur <ct...@gmail.com> on 2017/06/22 16:43:34 UTC

Kafka streams KStream and ktable join issue

Hello,

I am trying to perform a simple join operation. I am using Kafka 0.10.2

I have a "raw" table and a "cache" topics and just 1 partition in my local
environment.

ktable has these entries

{"Joe": {"location": "US", "gender": "male"}}
{"Julie": {"location": "US", "gender": "female"}}
{"Kawasaki": {"location": "Japan", "gender": "male"}}

The kstream gets a event

{"user": "Joe", "custom": {"choice":"vegan"}}

I want a output as a join

{"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": "US",
"gender": "male"}*} }

I want to take whats in ktable and add to enriched section of the output
stream.

I have defined serde

//This is the same serde code from the example.

final TestStreamsSerializer<JsonNode> jsonSerializer = new
TestStreamsSerializer();
final TestStreamsDeserialzer<JsonNode> jsonDeserializer = new
TestStreamsDeserialzer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);

//

KStream<String,JsonNode> raw = builder.stream(Serdes.String(),
jsonSerde, "raw");
KTable <String,JsonNode> cache = builder.table("cache", "local-cache");

raw.leftJoin(cache,
        (record1, record2) -> record1.get("user") + "-" + record2).to("output");

I am having trouble understanding how to call the join api.

With the above code, I seem to get a error:

[2017-06-22 09:23:31,836] ERROR User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
streams-pipe failed on partition assignment
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

java.lang.NullPointerException

at org.rocksdb.RocksDB.put(RocksDB.java:488)

at
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)

at
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)

at
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)

at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)

at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)

at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

[2017-06-22 09:23:31,849] WARN stream-thread [StreamThread-1] Unexpected
state transition from ASSIGNING_PARTITIONS to NOT_RUNNING.
(org.apache.kafka.streams.processor.internals.StreamThread)

Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-1] Failed to rebalance

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

Caused by: java.lang.NullPointerException

at org.rocksdb.RocksDB.put(RocksDB.java:488)

at
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)

at
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)

at
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)

at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)

at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)

at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

... 1 more

Re: Kafka streams KStream and ktable join issue

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I thought, we drop records with null key? No?

-Matthias

On 6/23/17 12:25 AM, Damian Guy wrote:
> My guess is it is because the record doesn't have a key, i.e., the key is
> null. We have a fix for this in 0.11, in that we will skip records with a
> null key during restore.
> 
> On Fri, 23 Jun 2017 at 03:57 Matthias J. Sax <ma...@confluent.io> wrote:
> 
>> Hi,
>>
>> can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?
>>
>> It's unclear to me, how an NPE can occur. It seems to happen within
>> Streams library. Might be a bug. Not sure atm.
>>
>>
>> -Matthias
>>
>> On 6/22/17 9:43 AM, Shekar Tippur wrote:
>>> Hello,
>>>
>>> I am trying to perform a simple join operation. I am using Kafka 0.10.2
>>>
>>> I have a "raw" table and a "cache" topics and just 1 partition in my
>> local
>>> environment.
>>>
>>> ktable has these entries
>>>
>>> {"Joe": {"location": "US", "gender": "male"}}
>>> {"Julie": {"location": "US", "gender": "female"}}
>>> {"Kawasaki": {"location": "Japan", "gender": "male"}}
>>>
>>> The kstream gets a event
>>>
>>> {"user": "Joe", "custom": {"choice":"vegan"}}
>>>
>>> I want a output as a join
>>>
>>> {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location":
>> "US",
>>> "gender": "male"}*} }
>>>
>>> I want to take whats in ktable and add to enriched section of the output
>>> stream.
>>>
>>> I have defined serde
>>>
>>> //This is the same serde code from the example.
>>>
>>> final TestStreamsSerializer<JsonNode> jsonSerializer = new
>>> TestStreamsSerializer();
>>> final TestStreamsDeserialzer<JsonNode> jsonDeserializer = new
>>> TestStreamsDeserialzer();
>>> final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
>>> jsonDeserializer);
>>>
>>> //
>>>
>>> KStream<String,JsonNode> raw = builder.stream(Serdes.String(),
>>> jsonSerde, "raw");
>>> KTable <String,JsonNode> cache = builder.table("cache", "local-cache");
>>>
>>> raw.leftJoin(cache,
>>>         (record1, record2) -> record1.get("user") + "-" +
>> record2).to("output");
>>>
>>> I am having trouble understanding how to call the join api.
>>>
>>> With the above code, I seem to get a error:
>>>
>>> [2017-06-22 09:23:31,836] ERROR User provided listener
>>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>>> streams-pipe failed on partition assignment
>>> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>>>
>>> java.lang.NullPointerException
>>>
>>> at org.rocksdb.RocksDB.put(RocksDB.java:488)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>>>
>>> [2017-06-22 09:23:31,849] WARN stream-thread [StreamThread-1] Unexpected
>>> state transition from ASSIGNING_PARTITIONS to NOT_RUNNING.
>>> (org.apache.kafka.streams.processor.internals.StreamThread)
>>>
>>> Exception in thread "StreamThread-1"
>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>>> [StreamThread-1] Failed to rebalance
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>>>
>>> Caused by: java.lang.NullPointerException
>>>
>>> at org.rocksdb.RocksDB.put(RocksDB.java:488)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>>>
>>> ... 1 more
>>>
>>
>>
> 


Re: Kafka streams KStream and ktable join issue

Posted by Damian Guy <da...@gmail.com>.
My guess is it is because the record doesn't have a key, i.e., the key is
null. We have a fix for this in 0.11, in that we will skip records with a
null key during restore.

On Fri, 23 Jun 2017 at 03:57 Matthias J. Sax <ma...@confluent.io> wrote:

> Hi,
>
> can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?
>
> It's unclear to me, how an NPE can occur. It seems to happen within
> Streams library. Might be a bug. Not sure atm.
>
>
> -Matthias
>
> On 6/22/17 9:43 AM, Shekar Tippur wrote:
> > Hello,
> >
> > I am trying to perform a simple join operation. I am using Kafka 0.10.2
> >
> > I have a "raw" table and a "cache" topics and just 1 partition in my
> local
> > environment.
> >
> > ktable has these entries
> >
> > {"Joe": {"location": "US", "gender": "male"}}
> > {"Julie": {"location": "US", "gender": "female"}}
> > {"Kawasaki": {"location": "Japan", "gender": "male"}}
> >
> > The kstream gets a event
> >
> > {"user": "Joe", "custom": {"choice":"vegan"}}
> >
> > I want a output as a join
> >
> > {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location":
> "US",
> > "gender": "male"}*} }
> >
> > I want to take whats in ktable and add to enriched section of the output
> > stream.
> >
> > I have defined serde
> >
> > //This is the same serde code from the example.
> >
> > final TestStreamsSerializer<JsonNode> jsonSerializer = new
> > TestStreamsSerializer();
> > final TestStreamsDeserialzer<JsonNode> jsonDeserializer = new
> > TestStreamsDeserialzer();
> > final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
> > jsonDeserializer);
> >
> > //
> >
> > KStream<String,JsonNode> raw = builder.stream(Serdes.String(),
> > jsonSerde, "raw");
> > KTable <String,JsonNode> cache = builder.table("cache", "local-cache");
> >
> > raw.leftJoin(cache,
> >         (record1, record2) -> record1.get("user") + "-" +
> record2).to("output");
> >
> > I am having trouble understanding how to call the join api.
> >
> > With the above code, I seem to get a error:
> >
> > [2017-06-22 09:23:31,836] ERROR User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > streams-pipe failed on partition assignment
> > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> >
> > java.lang.NullPointerException
> >
> > at org.rocksdb.RocksDB.put(RocksDB.java:488)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> >
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> >
> > [2017-06-22 09:23:31,849] WARN stream-thread [StreamThread-1] Unexpected
> > state transition from ASSIGNING_PARTITIONS to NOT_RUNNING.
> > (org.apache.kafka.streams.processor.internals.StreamThread)
> >
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-1] Failed to rebalance
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> >
> > Caused by: java.lang.NullPointerException
> >
> > at org.rocksdb.RocksDB.put(RocksDB.java:488)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> >
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> >
> > ... 1 more
> >
>
>

Re: Kafka streams KStream and ktable join issue

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?

It's unclear to me, how an NPE can occur. It seems to happen within
Streams library. Might be a bug. Not sure atm.


-Matthias

On 6/22/17 9:43 AM, Shekar Tippur wrote:
> Hello,
> 
> I am trying to perform a simple join operation. I am using Kafka 0.10.2
> 
> I have a "raw" table and a "cache" topics and just 1 partition in my local
> environment.
> 
> ktable has these entries
> 
> {"Joe": {"location": "US", "gender": "male"}}
> {"Julie": {"location": "US", "gender": "female"}}
> {"Kawasaki": {"location": "Japan", "gender": "male"}}
> 
> The kstream gets a event
> 
> {"user": "Joe", "custom": {"choice":"vegan"}}
> 
> I want a output as a join
> 
> {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": "US",
> "gender": "male"}*} }
> 
> I want to take whats in ktable and add to enriched section of the output
> stream.
> 
> I have defined serde
> 
> //This is the same serde code from the example.
> 
> final TestStreamsSerializer<JsonNode> jsonSerializer = new
> TestStreamsSerializer();
> final TestStreamsDeserialzer<JsonNode> jsonDeserializer = new
> TestStreamsDeserialzer();
> final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
> jsonDeserializer);
> 
> //
> 
> KStream<String,JsonNode> raw = builder.stream(Serdes.String(),
> jsonSerde, "raw");
> KTable <String,JsonNode> cache = builder.table("cache", "local-cache");
> 
> raw.leftJoin(cache,
>         (record1, record2) -> record1.get("user") + "-" + record2).to("output");
> 
> I am having trouble understanding how to call the join api.
> 
> With the above code, I seem to get a error:
> 
> [2017-06-22 09:23:31,836] ERROR User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> streams-pipe failed on partition assignment
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> 
> java.lang.NullPointerException
> 
> at org.rocksdb.RocksDB.put(RocksDB.java:488)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> 
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> 
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> 
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> 
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> 
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> 
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> 
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> 
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> 
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> 
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> 
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> 
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> 
> [2017-06-22 09:23:31,849] WARN stream-thread [StreamThread-1] Unexpected
> state transition from ASSIGNING_PARTITIONS to NOT_RUNNING.
> (org.apache.kafka.streams.processor.internals.StreamThread)
> 
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-1] Failed to rebalance
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> 
> Caused by: java.lang.NullPointerException
> 
> at org.rocksdb.RocksDB.put(RocksDB.java:488)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> 
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> 
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> 
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> 
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> 
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> 
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> 
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> 
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> 
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> 
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> 
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> 
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> 
> ... 1 more
>