You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2016/12/16 10:10:36 UTC

What makes a KStream app exit?

Im seeing instances where my apps are exiting (gracefully, mind you)
without any obvious errors or cause. I have debug logs from many instances
of this and have yet to find a reason to explain what's happening.

- nothing in the app log
- nothing in /var/log/messages (IE not OOM killed)
- not being closed via /etc/init.d
- nothing in the broker logs

Running 0.10.1.0

example log:

https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0/view?usp=sharing

Re: What makes a KStream app exit?

Posted by Guozhang Wang <wa...@gmail.com>.
Re: "UnsupportedOperationException: null org.apache.kafka.streams.
processor.internals.StandbyContextImpl.recordCollector(
StandyContextImpl.java:81)": I think this is a known issue that has been
fixed in trunk:

https://github.com/apache/kafka/commit/a4592a18641f84a1983c5fe7e697a8
b0ab43eb25


Guozhang

On Fri, Dec 16, 2016 at 11:53 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I guess. It's bugs, so always hard to be 100% sure.
>
> We know about a null-pointer bug in task assignment/creating -- so I
> assume it what you see.
>
> -Matthias
>
> On 12/16/16 11:19 AM, Jon Yeargers wrote:
> > And these bugs would cause the behaviors Im seeing?
> >
> > On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> We just discovered a couple of bugs with regard to standby tasks... Not
> >> all bug fix PRs got merged yet.
> >>
> >> You can try running on trunk to get those fixes. Should only be a few
> >> days until the fixes get merged.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/16/16 9:10 AM, Jon Yeargers wrote:
> >>> Have started having this issue with another KStream based app. Digging
> >>> through logs I ran across this message:
> >>>
> >>> When I've seen it before it certainly does kill the application. At the
> >> end
> >>> of the SNIP you can see the exit process starting.
> >>>
> >>>
> >>> 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
> >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >> creating
> >>> new standby task 0_0
> >>>
> >>> 2016-12-16 17:04:51,507 [StreamThread-1] INFO
> >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >> Creating
> >>> new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
> >>>
> >>> 2016-12-16 17:04:51,508 [StreamThread-1] INFO
> >>> o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing
> state
> >>> stores
> >>>
> >>> 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
> >>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> >> fetching
> >>> committed offsets for partitions: [rtdetail_breakout-0]
> >>>
> >>> 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
> >>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> >>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> >>> RtDetailBreakoutProcessor fa
> >>>
> >>> iled on partition assignment
> >>>
> >>> java.lang.UnsupportedOperationException: null
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.StandbyContextImpl.
> >> recordCollector(StandbyContextImpl.java:81)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
> >> StoreChangeLogger.java:54)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
> >> StoreChangeLogger.java:46)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> >> RocksDBWindowStore.java:197)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> >> MeteredWindowStore.java:66)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> >> CachingWindowStore.java:64)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.AbstractTask.
> >> initializeStateStores(AbstractTask.java:86)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.StandbyTask.<init>(
> >> StandbyTask.java:68)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.
> >> createStandbyTask(StreamThread.java:733)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.addStandbyTasks(StreamThread.java:757)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.access$200(
> >> StreamThread.java:69)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:125)
> >>>
> >>>         at
> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> onJoinComplete(ConsumerCoordinator.java:229)
> >>>
> >>>         at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >>>
> >>>         at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> ensureActiveGroup(AbstractCoordinator.java:277)
> >>>
> >>>         at
> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> >> ConsumerCoordinator.java:260)
> >>>
> >>>         at
> >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> >> pollOnce(KafkaConsumer.java:1013)
> >>>
> >>>         at
> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> KafkaConsumer.java:979)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:442)
> >>>
> >>>         at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:242)
> >>>
> >>> 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
> >>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> >> fetching
> >>> committed offsets for partitions: [rtdetail_breakout-2,
> >>> rtdetail_breakout-1, rtd
> >>>
> >>> etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> >>> rtdetail_breakout-2 to the committed offset 1989670807
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> >>> rtdetail_breakout-1 to the committed offset 1991427117
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> >>> rtdetail_breakout-6 to the committed offset 1986565752
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> >>> rtdetail_breakout-5 to the committed offset 1982149459
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> >>> rtdetail_breakout_filtered-1 to the committed offset 92917
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
> >>> 54.154.234.110:9092.
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
> >>> 54.194.192.105:9092.
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
> >>> 54.171.236.113:9092.
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
> >>> 54.154.115.41:9092.
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] INFO
> >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >> Shutting
> >>> down
> >>>
> >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >>> shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1,
> >> 0_5,
> >>> 0_6]] and standby tasks [[]]
> >>>
> >>> On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <
> jon.yeargers@cedexis.com>
> >>> wrote:
> >>>
> >>>> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
> >>>> called unknowingly.
> >>>>
> >>>> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> >>>>             try {
> >>>>                 LOGGER.warn("ShutdownHook");
> >>>>                 kafkaStreams.close();
> >>>>             } catch (Exception e) {
> >>>>                 // ignored
> >>>>             }
> >>>>         }));
> >>>>
> >>>>
> >>>> Ran another test and the app closed after ~40min. The above message
> >>>> appears 3rd from the end (several seconds after the shutdown process
> has
> >>>> commenced).
> >>>>
> >>>> (attaching log section)
> >>>>
> >>>> This has *got* to be something that I've setup improperly... I just
> >> can't
> >>>> seem to see it.
> >>>>
> >>>> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <
> jon.yeargers@cedexis.com
> >>>
> >>>> wrote:
> >>>>
> >>>>> Im seeing instances where my apps are exiting (gracefully, mind you)
> >>>>> without any obvious errors or cause. I have debug logs from many
> >> instances
> >>>>> of this and have yet to find a reason to explain what's happening.
> >>>>>
> >>>>> - nothing in the app log
> >>>>> - nothing in /var/log/messages (IE not OOM killed)
> >>>>> - not being closed via /etc/init.d
> >>>>> - nothing in the broker logs
> >>>>>
> >>>>> Running 0.10.1.0
> >>>>>
> >>>>> example log:
> >>>>>
> >>>>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
> >>>>> /view?usp=sharing
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>


-- 
-- Guozhang

Re: What makes a KStream app exit?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I guess. It's bugs, so always hard to be 100% sure.

We know about a null-pointer bug in task assignment/creating -- so I
assume it what you see.

-Matthias

On 12/16/16 11:19 AM, Jon Yeargers wrote:
> And these bugs would cause the behaviors Im seeing?
> 
> On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> We just discovered a couple of bugs with regard to standby tasks... Not
>> all bug fix PRs got merged yet.
>>
>> You can try running on trunk to get those fixes. Should only be a few
>> days until the fixes get merged.
>>
>>
>> -Matthias
>>
>> On 12/16/16 9:10 AM, Jon Yeargers wrote:
>>> Have started having this issue with another KStream based app. Digging
>>> through logs I ran across this message:
>>>
>>> When I've seen it before it certainly does kill the application. At the
>> end
>>> of the SNIP you can see the exit process starting.
>>>
>>>
>>> 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>> creating
>>> new standby task 0_0
>>>
>>> 2016-12-16 17:04:51,507 [StreamThread-1] INFO
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>> Creating
>>> new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
>>>
>>> 2016-12-16 17:04:51,508 [StreamThread-1] INFO
>>> o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
>>> stores
>>>
>>> 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
>>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
>> fetching
>>> committed offsets for partitions: [rtdetail_breakout-0]
>>>
>>> 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
>>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
>>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>>> RtDetailBreakoutProcessor fa
>>>
>>> iled on partition assignment
>>>
>>> java.lang.UnsupportedOperationException: null
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StandbyContextImpl.
>> recordCollector(StandbyContextImpl.java:81)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
>> StoreChangeLogger.java:54)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
>> StoreChangeLogger.java:46)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
>> RocksDBWindowStore.java:197)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
>> MeteredWindowStore.java:66)
>>>
>>>         at
>>> org.apache.kafka.streams.state.internals.CachingWindowStore.init(
>> CachingWindowStore.java:64)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.AbstractTask.
>> initializeStateStores(AbstractTask.java:86)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StandbyTask.<init>(
>> StandbyTask.java:68)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread.
>> createStandbyTask(StreamThread.java:733)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.addStandbyTasks(StreamThread.java:757)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread.access$200(
>> StreamThread.java:69)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>> onPartitionsAssigned(StreamThread.java:125)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> onJoinComplete(ConsumerCoordinator.java:229)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:313)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:277)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:260)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1013)
>>>
>>>         at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:979)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:442)
>>>
>>>         at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:242)
>>>
>>> 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
>>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
>> fetching
>>> committed offsets for partitions: [rtdetail_breakout-2,
>>> rtdetail_breakout-1, rtd
>>>
>>> etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-2 to the committed offset 1989670807
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-1 to the committed offset 1991427117
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-6 to the committed offset 1986565752
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout-5 to the committed offset 1982149459
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
>>> rtdetail_breakout_filtered-1 to the committed offset 92917
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
>>> 54.154.234.110:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
>>> 54.194.192.105:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
>>> 54.171.236.113:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
>>> 54.154.115.41:9092.
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] INFO
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>> Shutting
>>> down
>>>
>>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
>>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
>>> shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1,
>> 0_5,
>>> 0_6]] and standby tasks [[]]
>>>
>>> On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <jo...@cedexis.com>
>>> wrote:
>>>
>>>> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
>>>> called unknowingly.
>>>>
>>>> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>>>>             try {
>>>>                 LOGGER.warn("ShutdownHook");
>>>>                 kafkaStreams.close();
>>>>             } catch (Exception e) {
>>>>                 // ignored
>>>>             }
>>>>         }));
>>>>
>>>>
>>>> Ran another test and the app closed after ~40min. The above message
>>>> appears 3rd from the end (several seconds after the shutdown process has
>>>> commenced).
>>>>
>>>> (attaching log section)
>>>>
>>>> This has *got* to be something that I've setup improperly... I just
>> can't
>>>> seem to see it.
>>>>
>>>> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jon.yeargers@cedexis.com
>>>
>>>> wrote:
>>>>
>>>>> Im seeing instances where my apps are exiting (gracefully, mind you)
>>>>> without any obvious errors or cause. I have debug logs from many
>> instances
>>>>> of this and have yet to find a reason to explain what's happening.
>>>>>
>>>>> - nothing in the app log
>>>>> - nothing in /var/log/messages (IE not OOM killed)
>>>>> - not being closed via /etc/init.d
>>>>> - nothing in the broker logs
>>>>>
>>>>> Running 0.10.1.0
>>>>>
>>>>> example log:
>>>>>
>>>>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
>>>>> /view?usp=sharing
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: What makes a KStream app exit?

Posted by Jon Yeargers <jo...@cedexis.com>.
And these bugs would cause the behaviors Im seeing?

On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> We just discovered a couple of bugs with regard to standby tasks... Not
> all bug fix PRs got merged yet.
>
> You can try running on trunk to get those fixes. Should only be a few
> days until the fixes get merged.
>
>
> -Matthias
>
> On 12/16/16 9:10 AM, Jon Yeargers wrote:
> > Have started having this issue with another KStream based app. Digging
> > through logs I ran across this message:
> >
> > When I've seen it before it certainly does kill the application. At the
> end
> > of the SNIP you can see the exit process starting.
> >
> >
> > 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> creating
> > new standby task 0_0
> >
> > 2016-12-16 17:04:51,507 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Creating
> > new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
> >
> > 2016-12-16 17:04:51,508 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
> > stores
> >
> > 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
> > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> fetching
> > committed offsets for partitions: [rtdetail_breakout-0]
> >
> > 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
> > o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > RtDetailBreakoutProcessor fa
> >
> > iled on partition assignment
> >
> > java.lang.UnsupportedOperationException: null
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StandbyContextImpl.
> recordCollector(StandbyContextImpl.java:81)
> >
> >         at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
> StoreChangeLogger.java:54)
> >
> >         at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(
> StoreChangeLogger.java:46)
> >
> >         at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:197)
> >
> >         at
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> MeteredWindowStore.java:66)
> >
> >         at
> > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:64)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StandbyTask.<init>(
> StandbyTask.java:68)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> createStandbyTask(StreamThread.java:733)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStandbyTasks(StreamThread.java:757)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$200(
> StreamThread.java:69)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:125)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:229)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> >
> >         at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:260)
> >
> >         at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> >
> >         at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:442)
> >
> >         at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
> > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> fetching
> > committed offsets for partitions: [rtdetail_breakout-2,
> > rtdetail_breakout-1, rtd
> >
> > etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-2 to the committed offset 1989670807
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-1 to the committed offset 1991427117
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-6 to the committed offset 1986565752
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout-5 to the committed offset 1982149459
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> > rtdetail_breakout_filtered-1 to the committed offset 92917
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
> > 54.154.234.110:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
> > 54.194.192.105:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
> > 54.171.236.113:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
> > 54.154.115.41:9092.
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Shutting
> > down
> >
> > 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> > shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1,
> 0_5,
> > 0_6]] and standby tasks [[]]
> >
> > On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <jo...@cedexis.com>
> > wrote:
> >
> >> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
> >> called unknowingly.
> >>
> >> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> >>             try {
> >>                 LOGGER.warn("ShutdownHook");
> >>                 kafkaStreams.close();
> >>             } catch (Exception e) {
> >>                 // ignored
> >>             }
> >>         }));
> >>
> >>
> >> Ran another test and the app closed after ~40min. The above message
> >> appears 3rd from the end (several seconds after the shutdown process has
> >> commenced).
> >>
> >> (attaching log section)
> >>
> >> This has *got* to be something that I've setup improperly... I just
> can't
> >> seem to see it.
> >>
> >> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jon.yeargers@cedexis.com
> >
> >> wrote:
> >>
> >>> Im seeing instances where my apps are exiting (gracefully, mind you)
> >>> without any obvious errors or cause. I have debug logs from many
> instances
> >>> of this and have yet to find a reason to explain what's happening.
> >>>
> >>> - nothing in the app log
> >>> - nothing in /var/log/messages (IE not OOM killed)
> >>> - not being closed via /etc/init.d
> >>> - nothing in the broker logs
> >>>
> >>> Running 0.10.1.0
> >>>
> >>> example log:
> >>>
> >>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
> >>> /view?usp=sharing
> >>>
> >>
> >>
> >
>
>

Re: What makes a KStream app exit?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
We just discovered a couple of bugs with regard to standby tasks... Not
all bug fix PRs got merged yet.

You can try running on trunk to get those fixes. Should only be a few
days until the fixes get merged.


-Matthias

On 12/16/16 9:10 AM, Jon Yeargers wrote:
> Have started having this issue with another KStream based app. Digging
> through logs I ran across this message:
> 
> When I've seen it before it certainly does kill the application. At the end
> of the SNIP you can see the exit process starting.
> 
> 
> 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] creating
> new standby task 0_0
> 
> 2016-12-16 17:04:51,507 [StreamThread-1] INFO
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Creating
> new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
> 
> 2016-12-16 17:04:51,508 [StreamThread-1] INFO
> o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
> stores
> 
> 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor fetching
> committed offsets for partitions: [rtdetail_breakout-0]
> 
> 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> RtDetailBreakoutProcessor fa
> 
> iled on partition assignment
> 
> java.lang.UnsupportedOperationException: null
> 
>         at
> org.apache.kafka.streams.processor.internals.StandbyContextImpl.recordCollector(StandbyContextImpl.java:81)
> 
>         at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:54)
> 
>         at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:46)
> 
>         at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:197)
> 
>         at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
> 
>         at
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)
> 
>         at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> 
>         at
> org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:68)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:733)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:757)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.access$200(StreamThread.java:69)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:125)
> 
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:229)
> 
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
> 
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
> 
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:260)
> 
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
> 
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:442)
> 
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> 
> 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor fetching
> committed offsets for partitions: [rtdetail_breakout-2,
> rtdetail_breakout-1, rtd
> 
> etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> rtdetail_breakout-2 to the committed offset 1989670807
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> rtdetail_breakout-1 to the committed offset 1991427117
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> rtdetail_breakout-6 to the committed offset 1986565752
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> rtdetail_breakout-5 to the committed offset 1982149459
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
> rtdetail_breakout_filtered-1 to the committed offset 92917
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
> 54.154.234.110:9092.
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
> 54.194.192.105:9092.
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
> 54.171.236.113:9092.
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
> 54.154.115.41:9092.
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] INFO
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Shutting
> down
> 
> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1, 0_5,
> 0_6]] and standby tasks [[]]
> 
> On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <jo...@cedexis.com>
> wrote:
> 
>> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
>> called unknowingly.
>>
>> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>>             try {
>>                 LOGGER.warn("ShutdownHook");
>>                 kafkaStreams.close();
>>             } catch (Exception e) {
>>                 // ignored
>>             }
>>         }));
>>
>>
>> Ran another test and the app closed after ~40min. The above message
>> appears 3rd from the end (several seconds after the shutdown process has
>> commenced).
>>
>> (attaching log section)
>>
>> This has *got* to be something that I've setup improperly... I just can't
>> seem to see it.
>>
>> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jo...@cedexis.com>
>> wrote:
>>
>>> Im seeing instances where my apps are exiting (gracefully, mind you)
>>> without any obvious errors or cause. I have debug logs from many instances
>>> of this and have yet to find a reason to explain what's happening.
>>>
>>> - nothing in the app log
>>> - nothing in /var/log/messages (IE not OOM killed)
>>> - not being closed via /etc/init.d
>>> - nothing in the broker logs
>>>
>>> Running 0.10.1.0
>>>
>>> example log:
>>>
>>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
>>> /view?usp=sharing
>>>
>>
>>
> 


Re: What makes a KStream app exit?

Posted by Jon Yeargers <jo...@cedexis.com>.
Have started having this issue with another KStream based app. Digging
through logs I ran across this message:

When I've seen it before it certainly does kill the application. At the end
of the SNIP you can see the exit process starting.


2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] creating
new standby task 0_0

2016-12-16 17:04:51,507 [StreamThread-1] INFO
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Creating
new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]

2016-12-16 17:04:51,508 [StreamThread-1] INFO
o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
stores

2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor fetching
committed offsets for partitions: [rtdetail_breakout-0]

2016-12-16 17:04:51,819 [StreamThread-1] ERROR
o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
RtDetailBreakoutProcessor fa

iled on partition assignment

java.lang.UnsupportedOperationException: null

        at
org.apache.kafka.streams.processor.internals.StandbyContextImpl.recordCollector(StandbyContextImpl.java:81)

        at
org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:54)

        at
org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:46)

        at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:197)

        at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

        at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

        at
org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:68)

        at
org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:733)

        at
org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:757)

        at
org.apache.kafka.streams.processor.internals.StreamThread.access$200(StreamThread.java:69)

        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:125)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:229)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:260)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:442)

        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor fetching
committed offsets for partitions: [rtdetail_breakout-2,
rtdetail_breakout-1, rtd

etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1]

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-2 to the committed offset 1989670807

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-1 to the committed offset 1991427117

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-6 to the committed offset 1986565752

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout-5 to the committed offset 1982149459

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition
rtdetail_breakout_filtered-1 to the committed offset 92917

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at
54.154.234.110:9092.

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at
54.194.192.105:9092.

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at
54.171.236.113:9092.

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at
54.154.115.41:9092.

2016-12-16 17:04:51,821 [StreamThread-1] INFO
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Shutting
down

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1, 0_5,
0_6]] and standby tasks [[]]

On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers <jo...@cedexis.com>
wrote:

> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
> called unknowingly.
>
> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>             try {
>                 LOGGER.warn("ShutdownHook");
>                 kafkaStreams.close();
>             } catch (Exception e) {
>                 // ignored
>             }
>         }));
>
>
> Ran another test and the app closed after ~40min. The above message
> appears 3rd from the end (several seconds after the shutdown process has
> commenced).
>
> (attaching log section)
>
> This has *got* to be something that I've setup improperly... I just can't
> seem to see it.
>
> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jo...@cedexis.com>
> wrote:
>
>> Im seeing instances where my apps are exiting (gracefully, mind you)
>> without any obvious errors or cause. I have debug logs from many instances
>> of this and have yet to find a reason to explain what's happening.
>>
>> - nothing in the app log
>> - nothing in /var/log/messages (IE not OOM killed)
>> - not being closed via /etc/init.d
>> - nothing in the broker logs
>>
>> Running 0.10.1.0
>>
>> example log:
>>
>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
>> /view?usp=sharing
>>
>
>

Re: What makes a KStream app exit?

Posted by Jon Yeargers <jo...@cedexis.com>.
FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
called unknowingly.

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                LOGGER.warn("ShutdownHook");
                kafkaStreams.close();
            } catch (Exception e) {
                // ignored
            }
        }));


Ran another test and the app closed after ~40min. The above message appears
3rd from the end (several seconds after the shutdown process has commenced).

(attaching log section)

This has *got* to be something that I've setup improperly... I just can't
seem to see it.

On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers <jo...@cedexis.com>
wrote:

> Im seeing instances where my apps are exiting (gracefully, mind you)
> without any obvious errors or cause. I have debug logs from many instances
> of this and have yet to find a reason to explain what's happening.
>
> - nothing in the app log
> - nothing in /var/log/messages (IE not OOM killed)
> - not being closed via /etc/init.d
> - nothing in the broker logs
>
> Running 0.10.1.0
>
> example log:
>
> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0/
> view?usp=sharing
>