You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2019/03/30 00:31:00 UTC
[jira] [Resolved] (KAFKA-4799) session timeout during event
processing shuts down stream
[ https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-4799.
----------------------------------
Resolution: Fixed
Fix Version/s: 0.11.0.1
> session timeout during event processing shuts down stream
> ---------------------------------------------------------
>
> Key: KAFKA-4799
> URL: https://issues.apache.org/jira/browse/KAFKA-4799
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.1
> Environment: kafka streams client running on os x, with docker machine running broker
> Reporter: Jacob Gur
> Priority: Critical
> Fix For: 0.11.0.1
>
>
> I have a simple stream application like this:
> {code:title=Part of my class|borderStyle=solid}
> private <T> IConsumerSubscription buildSubscriptionStream(
> Class<T> clazz, Consumer<T> consumer, String group,
> Function<KStreamBuilder, KStream<String, String>> topicStreamFunc)
> {
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> stream = topicStreamFunc.apply(builder);
> stream.foreach((k, v) -> {
> try {
> T value = _jsonObjectMapper.mapFromJsonString(v, clazz);
> consumer.accept(value);
> Logger.trace("Consumed message {}", value);
> } catch (Throwable th) {
> Logger.warn("Error while consuming message", th);
> }
> });
> final KafkaStreams streams = new KafkaStreams(builder, constructProperties(group));
> streams.start();
> return streams::close;
> }
> {code}
> There is just one client running this application stream.
> If I run the client in a debugger with a breakpoint on the event processor (i.e., inside the foreach lambda) with debugger suspending all threads for perhaps more than 10 seconds, then when I resume the application:
> Actual behavior - the stream shuts down
> Expected behavior - the stream should recover, perhaps temporarily removed from partition but then re-added and recovered.
> It looks like what happens is this:
> 1) The kafka client session times out.
> 2) The partition is revoked
> 3) The streams library has a rebalance listener that tries to commit offsets, but that commit fails due to a rebalance exception.
> 4) Stream shuts down.
> Steps 3 and 4 occur in StreamThread's rebalance listener.
> It seems that it should be more resilient and recover just like a regular KafkaConsumer would. Its partition would be revoked, and then it would get it back again and resume processing at the last offset.
> Is current behavior expected and I'm not understanding the intention? Or is this a bug?
> Thanks!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)