You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/28 16:23:56 UTC
[kafka] branch 2.3 updated: MINOR: Only send delete request if
there are offsets in map (#7256)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new f1244e5 MINOR: Only send delete request if there are offsets in map (#7256)
f1244e5 is described below
commit f1244e508d6e25c2ee603578a0c897af235fc93a
Author: Bill Bejeck <bi...@confluent.io>
AuthorDate: Wed Aug 28 12:22:36 2019 -0400
MINOR: Only send delete request if there are offsets in map (#7256)
Currently on commit streams will attempt to delete offsets from repartition topics. However, if a topology does not have any repartition topics, then the recordsToDelete map will be empty.
This PR adds a check that the recordsToDelete is not empty before executing the AdminClient#deleteRecords() method.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/streams/processor/internals/TaskManager.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c136fdb..3dc8404 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -449,9 +449,10 @@ public class TaskManager {
for (final Map.Entry<TopicPartition, Long> entry : active.recordsToDelete().entrySet()) {
recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
}
- deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
-
- log.trace("Sent delete-records request: {}", recordsToDelete);
+ if (!recordsToDelete.isEmpty()) {
+ deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
+ log.trace("Sent delete-records request: {}", recordsToDelete);
+ }
}
}