You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/25 14:54:13 UTC

[kafka] branch trunk updated: MINOR: Make LogCleaner.shouldRetainRecord more readable (#6590)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad4a7c3  MINOR: Make LogCleaner.shouldRetainRecord more readable (#6590)
ad4a7c3 is described below

commit ad4a7c343620bdeb4395ff80b0fae7fbb585df7c
Author: Lysss <ly...@gmail.com>
AuthorDate: Thu Apr 25 22:53:47 2019 +0800

    MINOR: Make LogCleaner.shouldRetainRecord more readable (#6590)
    
    Reviewers: Bob Barrett <bo...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/LogCleaner.scala | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7fc70da..d172920 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -731,13 +731,14 @@ private[log] class Cleaner(val id: Int,
     if (record.hasKey) {
       val key = record.key
       val foundOffset = map.get(key)
-      /* two cases in which we can get rid of a message:
-       *   1) if there exists a message with the same key but higher offset
-       *   2) if the message is a delete "tombstone" marker and enough time has passed
+      /* First,the message must have the latest offset for the key
+       * then there are two cases in which we can retain a message:
+       *   1) The message has value
+       *   2) The message doesn't has value but it can't be deleted now.
        */
-      val redundant = foundOffset >= 0 && record.offset < foundOffset
-      val obsoleteDelete = !retainDeletes && !record.hasValue
-      !redundant && !obsoleteDelete
+      val latestOffsetForKey = record.offset() >= foundOffset
+      val isRetainedValue = record.hasValue || retainDeletes
+      latestOffsetForKey && isRetainedValue
     } else {
       stats.invalidMessage()
       false