You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/12 03:28:00 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9616: KAFKA-10091: KIP-695: Deterministic semantics for task idling

vvcephei commented on a change in pull request #9616:
URL: https://github.com/apache/kafka/pull/9616#discussion_r541489164



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -331,9 +331,29 @@ public static StreamThread create(final InternalTopologyBuilder builder,
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
+        log.info("Creating consumer client");
+        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+        final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), threadIdx);
+        consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+
+        final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        // If there are any overrides, we never fall through to the consumer, but only handle offset management ourselves.
+        if (!builder.latestResetTopicsPattern().pattern().isEmpty() || !builder.earliestResetTopicsPattern().pattern().isEmpty()) {
+            consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+        }
+
+        final int maxPollTimeMs =
+            new InternalConsumerConfig(config.getMainConsumerConfigs("", "", 0))
+                .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+
+        // Heuristically bound the topic-partition metadata staleness at twice the max poll interval.
+        // This should typically mean that we get at least four chances to refresh the metadata before it expires.
+        final long metadataStalenessBoundMs = 2L * maxPollTimeMs;

Review comment:
       Good question. It's four because we "learn" the numIterations in StreamThread so that processing takes no more than 1/2 the max poll interval. If we wait for 2 max poll intervals, then we should have at least 4 calls to poll. If Streams runs out of records to process, we'll call poll sooner, so we're probably calling poll quite a bit more frequently.
   
   Note that if we do get fresher metadata, we overwrite the older copy and use the new information instead. I'm not sure if there's any real benefit in being even stricter. What I was aiming for was rather a bound that we'd probably only hit if something is really wrong, eg talking to the brokers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org