You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/14 05:10:37 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #10563: KAFKA-12487: Add support for cooperative consumer protocol with sink connectors

kkonstantine commented on a change in pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#discussion_r669274974



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -367,42 +369,53 @@ private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        commitOffsets(now, closing, consumer.assignment());
+    }
+
+    private void commitOffsets(long now, boolean closing, Collection<TopicPartition> topicPartitions) {
         if (workerErrantRecordReporter != null) {
-            log.trace("Awaiting all reported errors to be completed");
-            workerErrantRecordReporter.awaitAllFutures();
-            log.trace("Completed all reported errors");
+            log.trace("Awaiting reported errors for {} to be completed", topicPartitions);

Review comment:
       I wonder if we want to print the actual list of partitions here, which might be long. And do it twice. 
   I see the same pattern is applied elsewhere. I understand the value of explicit listing. 
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -412,32 +425,36 @@ private void commitOffsets(long now, boolean closing) {
             return;
         }
 
-        final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets);
+        Collection<TopicPartition> allAssignedTopicPartitions = consumer.assignment();
+        final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>(lastCommittedOffsets);
         for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
             final TopicPartition partition = taskProvidedOffsetEntry.getKey();
             final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue();
-            if (commitableOffsets.containsKey(partition)) {
+            if (committableOffsets.containsKey(partition)) {
                 long taskOffset = taskProvidedOffset.offset();
-                long currentOffset = currentOffsets.get(partition).offset();
+                long currentOffset = offsetsToCommit.get(partition).offset();
                 if (taskOffset <= currentOffset) {
-                    commitableOffsets.put(partition, taskProvidedOffset);
+                    committableOffsets.put(partition, taskProvidedOffset);
                 } else {
                     log.warn("{} Ignoring invalid task provided offset {}/{} -- not yet consumed, taskOffset={} currentOffset={}",
-                            this, partition, taskProvidedOffset, taskOffset, currentOffset);
+                        this, partition, taskProvidedOffset, taskOffset, currentOffset);
                 }
-            } else {
+            } else if (!allAssignedTopicPartitions.contains(partition)) {
                 log.warn("{} Ignoring invalid task provided offset {}/{} -- partition not assigned, assignment={}",
-                        this, partition, taskProvidedOffset, consumer.assignment());
+                        this, partition, taskProvidedOffset, allAssignedTopicPartitions);
+            } else {
+                log.debug("{} Ignoring task provided offset {}/{} -- topic partition not requested, requested={}",

Review comment:
       what's a requested topic partition?
   Also, above we mention just `partition`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -710,22 +758,35 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            onPartitionsRemoved(partitions, false);
+        }
+
+        @Override
+        public void onPartitionsLost(Collection<TopicPartition> partitions) {
+            onPartitionsRemoved(partitions, true);
+        }
+
+        private void onPartitionsRemoved(Collection<TopicPartition> partitions, boolean lost) {
             if (taskStopped) {
                 log.trace("Skipping partition revocation callback as task has already been stopped");
                 return;
             }
-            log.debug("{} Partitions revoked", WorkerSinkTask.this);
+            log.debug("{} Partitions {}: {}", WorkerSinkTask.this, lost ? "lost" : "revoked", partitions);
+
+            if (partitions.isEmpty())
+                return;
+
             try {
-                closePartitions();
-                sinkTaskMetricsGroup.clearOffsets();
+                closePartitions(partitions, lost);
+                sinkTaskMetricsGroup.clearOffsets(partitions);
             } catch (RuntimeException e) {
                 // The consumer swallows exceptions raised in the rebalance listener, so we need to store
                 // exceptions and rethrow when poll() returns.
                 rebalanceException = e;
             }
 
-            // Make sure we don't have any leftover data since offsets will be reset to committed positions
-            messageBatch.clear();
+            // Make sure we don't have any leftover data since offsets for these partitions will be reset to committed positions
+            messageBatch.removeIf(record -> partitions.contains(new TopicPartition(record.topic(), record.kafkaPartition())));

Review comment:
       do you worry that this became more expensive now, especially in cases where we want to remove everything as before?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -367,42 +369,53 @@ private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        commitOffsets(now, closing, consumer.assignment());
+    }
+
+    private void commitOffsets(long now, boolean closing, Collection<TopicPartition> topicPartitions) {
         if (workerErrantRecordReporter != null) {
-            log.trace("Awaiting all reported errors to be completed");
-            workerErrantRecordReporter.awaitAllFutures();
-            log.trace("Completed all reported errors");
+            log.trace("Awaiting reported errors for {} to be completed", topicPartitions);
+            workerErrantRecordReporter.awaitFutures(topicPartitions);
+            log.trace("Completed all reported errors for {}", topicPartitions);
         }
 
-        if (currentOffsets.isEmpty())
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = currentOffsets.entrySet().stream()
+            .filter(e -> topicPartitions.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        if (offsetsToCommit.isEmpty())
             return;
 
         committing = true;
         commitSeqno += 1;
         commitStarted = now;
         sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
 
+        Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets = this.lastCommittedOffsets.entrySet().stream()
+            .filter(e -> offsetsToCommit.containsKey(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
         final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets;
         try {
-            log.trace("{} Calling task.preCommit with current offsets: {}", this, currentOffsets);
-            taskProvidedOffsets = task.preCommit(new HashMap<>(currentOffsets));
+            log.trace("{} Calling task.preCommit with current offsets: {}", this, offsetsToCommit);
+            taskProvidedOffsets = task.preCommit(new HashMap<>(offsetsToCommit));
         } catch (Throwable t) {
             if (closing) {
                 log.warn("{} Offset commit failed during close", this);
-                onCommitCompleted(t, commitSeqno, null);
             } else {
                 log.error("{} Offset commit failed, rewinding to last committed offsets", this, t);
                 for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
                     log.debug("{} Rewinding topic partition {} to offset {}", this, entry.getKey(), entry.getValue().offset());
                     consumer.seek(entry.getKey(), entry.getValue().offset());
                 }
-                currentOffsets = new HashMap<>(lastCommittedOffsets);

Review comment:
       this change means we never create a fresh copy of the offsets map. 
   Is this correct? Is there a risk for the offsets to keep being added and never removed?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -367,42 +369,53 @@ private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        commitOffsets(now, closing, consumer.assignment());
+    }
+
+    private void commitOffsets(long now, boolean closing, Collection<TopicPartition> topicPartitions) {
         if (workerErrantRecordReporter != null) {
-            log.trace("Awaiting all reported errors to be completed");
-            workerErrantRecordReporter.awaitAllFutures();
-            log.trace("Completed all reported errors");
+            log.trace("Awaiting reported errors for {} to be completed", topicPartitions);
+            workerErrantRecordReporter.awaitFutures(topicPartitions);
+            log.trace("Completed all reported errors for {}", topicPartitions);
         }
 
-        if (currentOffsets.isEmpty())
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = currentOffsets.entrySet().stream()
+            .filter(e -> topicPartitions.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        if (offsetsToCommit.isEmpty())
             return;
 
         committing = true;
         commitSeqno += 1;
         commitStarted = now;
         sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
 
+        Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets = this.lastCommittedOffsets.entrySet().stream()

Review comment:
       shadowing of a member field this way is not ideal. 
   For example there's an older usage of this collection below. I assume it's the new local variable we want to use, but I can't be 100% sure. 
   
   Should we call this `actuallyCommittedOffsets` or similar?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -670,8 +707,7 @@ long getNextCommit() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
-            lastCommittedOffsets = new HashMap<>();

Review comment:
       similar question as above. This is a map that only grows now. Is this correct?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -49,7 +54,7 @@
     private final HeaderConverter headerConverter;
 
     // Visible for testing
-    protected final LinkedList<Future<Void>> futures;
+    protected final Map<TopicPartition, Future<Void>> futures;

Review comment:
       If we want this to be a concurrent map probably good idea to depict this in the declaration. 
   (same as you do in the tests below)
   
   ```suggestion
       protected final ConcurrentMap<TopicPartition, Future<Void>> futures;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org