You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/28 16:23:56 UTC

[kafka] branch 2.3 updated: MINOR: Only send delete request if there are offsets in map (#7256)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f1244e5  MINOR: Only send delete request if there are offsets in map (#7256)
f1244e5 is described below

commit f1244e508d6e25c2ee603578a0c897af235fc93a
Author: Bill Bejeck <bi...@confluent.io>
AuthorDate: Wed Aug 28 12:22:36 2019 -0400

    MINOR: Only send delete request if there are offsets in map (#7256)
    
    Currently on commit streams will attempt to delete offsets from repartition topics. However, if a topology does not have any repartition topics, then the recordsToDelete map will be empty.
    
    This PR adds a check that the recordsToDelete is not empty before executing the AdminClient#deleteRecords() method.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/processor/internals/TaskManager.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c136fdb..3dc8404 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -449,9 +449,10 @@ public class TaskManager {
             for (final Map.Entry<TopicPartition, Long> entry : active.recordsToDelete().entrySet()) {
                 recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
             }
-            deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
-
-            log.trace("Sent delete-records request: {}", recordsToDelete);
+            if (!recordsToDelete.isEmpty()) {
+                deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
+                log.trace("Sent delete-records request: {}", recordsToDelete);
+            }
         }
     }