You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2019/03/30 00:31:00 UTC

[jira] [Resolved] (KAFKA-4799) session timeout during event processing shuts down stream

     [ https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang resolved KAFKA-4799.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.11.0.1

> session timeout during event processing shuts down stream
> ---------------------------------------------------------
>
>                 Key: KAFKA-4799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4799
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: kafka streams client running on os x, with docker machine running broker
>            Reporter: Jacob Gur
>            Priority: Critical
>             Fix For: 0.11.0.1
>
>
> I have a simple stream application like this:
> {code:title=Part of my class|borderStyle=solid}
> 	private <T> IConsumerSubscription buildSubscriptionStream(
> 			Class<T> clazz, Consumer<T> consumer, String group,
> 			Function<KStreamBuilder, KStream<String, String>> topicStreamFunc)
> 	{
> 		KStreamBuilder builder = new KStreamBuilder();
> 		KStream<String, String> stream = topicStreamFunc.apply(builder);
> 		stream.foreach((k, v) -> {
> 			try {
> 				T value = _jsonObjectMapper.mapFromJsonString(v, clazz);
> 				consumer.accept(value);
> 				Logger.trace("Consumed message {}", value);
> 			} catch (Throwable th) {
> 				Logger.warn("Error while consuming message", th);
> 			}
> 		});
> 		final KafkaStreams streams = new KafkaStreams(builder, constructProperties(group));
> 		streams.start();
> 		return streams::close;
> 	}
> {code}
> There is just one client running this application stream.
> If I run the client in a debugger with a breakpoint on the event processor (i.e., inside the foreach lambda) with debugger suspending all threads for perhaps more than 10 seconds, then when I resume the application:
> Actual behavior - the stream shuts down
> Expected behavior - the stream should recover, perhaps temporarily removed from partition but then re-added and recovered.
> It looks like what happens is this:
> 1) The kafka client session times out.
> 2) The partition is revoked
> 3) The streams library has a rebalance listener that tries to commit offsets, but that commit fails due to a rebalance exception.
> 4) Stream shuts down.
> Steps 3 and 4 occur in StreamThread's rebalance listener.
> It seems that it should be more resilient and recover just like a regular KafkaConsumer would. Its partition would be revoked, and then it would get it back again and resume processing at the last offset.
> Is current behavior expected and I'm not understanding the intention? Or is this a bug?
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)