You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/25 14:54:13 UTC
[kafka] branch trunk updated: MINOR: Make
LogCleaner.shouldRetainRecord more readable (#6590)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ad4a7c3 MINOR: Make LogCleaner.shouldRetainRecord more readable (#6590)
ad4a7c3 is described below
commit ad4a7c343620bdeb4395ff80b0fae7fbb585df7c
Author: Lysss <ly...@gmail.com>
AuthorDate: Thu Apr 25 22:53:47 2019 +0800
MINOR: Make LogCleaner.shouldRetainRecord more readable (#6590)
Reviewers: Bob Barrett <bo...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7fc70da..d172920 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -731,13 +731,14 @@ private[log] class Cleaner(val id: Int,
if (record.hasKey) {
val key = record.key
val foundOffset = map.get(key)
- /* two cases in which we can get rid of a message:
- * 1) if there exists a message with the same key but higher offset
- * 2) if the message is a delete "tombstone" marker and enough time has passed
+ /* First,the message must have the latest offset for the key
+ * then there are two cases in which we can retain a message:
+ * 1) The message has value
+ * 2) The message doesn't has value but it can't be deleted now.
*/
- val redundant = foundOffset >= 0 && record.offset < foundOffset
- val obsoleteDelete = !retainDeletes && !record.hasValue
- !redundant && !obsoleteDelete
+ val latestOffsetForKey = record.offset() >= foundOffset
+ val isRetainedValue = record.hasValue || retainDeletes
+ latestOffsetForKey && isRetainedValue
} else {
stats.invalidMessage()
false