You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/04 21:52:53 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9616: POC: fix task idling

vvcephei commented on a change in pull request #9616:
URL: https://github.com/apache/kafka/pull/9616#discussion_r536403001



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -78,15 +89,147 @@ RecordQueue queue() {
         }
     }
 
-    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
+    private static class Metadata {
+        private final long lag;
+        private final long receivedTimestamp;
+
+        public Metadata(final long receivedTimestamp, final long lag) {
+            this.receivedTimestamp = receivedTimestamp;
+            this.lag = lag;
+        }
+    }
+
+    PartitionGroup(final TaskId id,
+                   final Map<TopicPartition, RecordQueue> partitionQueues,
+                   final Sensor recordLatenessSensor,
+                   final Sensor enforcedProcessingSensor,
+                   final long maxTaskIdleMs) {
+        this.id = id;
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
         this.partitionQueues = partitionQueues;
+        this.enforcedProcessingSensor = enforcedProcessingSensor;
+        this.maxTaskIdleMs = maxTaskIdleMs;
         this.recordLatenessSensor = recordLatenessSensor;
         totalBuffered = 0;
         allBuffered = false;
         streamTime = RecordQueue.UNKNOWN;
     }
 
+    public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
+        fetchedMetadata.put(partition, new Metadata(metadata.receivedTimestamp(), metadata.lag()));
+    }
+
+    public void resetFetchedMetadata() {
+        fetchedMetadata.clear();
+    }
+
+    public boolean readyToProcess(final long wallClockTime) {
+        // Log-level strategy:
+        //  TRACE for messages that don't wait for fetches, since these may be logged at extremely high frequency
+        //  DEBUG when we waited for a fetch and decided to wait some more, as configured
+        //  DEBUG when we are ready for processing and didn't have to enforce processing
+        //  INFO  when we enforce processing, since this has to wait for fetches AND may result in disorder
+
+        if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+            if (LOG.isTraceEnabled() && !allBuffered) {
+                final Set<TopicPartition> bufferedPartitions = new HashSet<>();
+                final Set<TopicPartition> emptyPartitions = new HashSet<>();
+                for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
+                    if (entry.getValue().isEmpty()) {
+                        emptyPartitions.add(entry.getKey());
+                    } else {
+                        bufferedPartitions.add(entry.getKey());
+                    }
+                }
+                LOG.trace("[{}] Ready for processing because max.task.idle.ms is disabled." +
+                              "\n\tThere may be out-of-order processing for this task as a result." +
+                              "\n\tBuffered partitions: {}" +
+                              "\n\tNon-buffered partitions: {}",
+                          id,
+                          bufferedPartitions,
+                          emptyPartitions);
+            }
+            return true;
+        }
+
+        final Set<TopicPartition> queued = new HashSet<>();
+        Map<TopicPartition, Long> enforced = null;
+
+        for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
+            final TopicPartition partition = entry.getKey();
+            final RecordQueue queue = entry.getValue();
+
+            // TODO: hardcoded stale metadata at 100s for POC, need to compute it more intelligently
+            fetchedMetadata.computeIfPresent(partition, (tp, m) -> wallClockTime - m.receivedTimestamp > 100_000L ? null : m);

Review comment:
       @guozhangwang , do you have any better ideas of how we can heuristically determine whether the lag is stale or not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org