You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2019/05/14 05:54:23 UTC

[kafka] branch trunk updated: MINOR: add docs for KIP-354 KAFKA-7321 (#6724)

This is an automated email from the ASF dual-hosted git repository.

jjkoshy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b86e8c1  MINOR: add docs for KIP-354 KAFKA-7321 (#6724)
b86e8c1 is described below

commit b86e8c1ea908b8fa2576d058700b0ac14b1b4a3e
Author: Xiongqi Wu <xi...@gmail.com>
AuthorDate: Mon May 13 22:54:07 2019 -0700

    MINOR: add docs for KIP-354 KAFKA-7321 (#6724)
    
    MINOR: update documentation for the log cleaner max compaction lag feature (KIP-354) implemented in KAFKA-7321
    
    Author: Xiongqi Wu <xi...@linkedin.com>
    Reviewer: Joel Koshy <jj...@gmail.com>
---
 .../org/apache/kafka/common/config/TopicConfig.java  | 14 +++++++++-----
 core/src/main/scala/kafka/server/KafkaConfig.scala   |  9 +++++++--
 docs/configuration.html                              |  1 +
 docs/design.html                                     | 20 ++++++++++++++------
 4 files changed, 31 insertions(+), 13 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index b1be6c8..f912cd2 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -104,6 +104,10 @@ public class TopicConfig {
     public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain " +
         "uncompacted in the log. Only applicable for logs that are being compacted.";
 
+    public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";
+    public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " +
+        "ineligible for compaction in the log. Only applicable for logs that are being compacted.";
+
     public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio";
     public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently " +
         "the log compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " +
@@ -111,11 +115,11 @@ public class TopicConfig {
         "50% of the log has been compacted. This ratio bounds the maximum space wasted in " +
         "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " +
         "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
-        "space in the log.";
-
-    public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";
-    public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " +
-        "ineligible for compaction in the log. Only applicable for logs that are being compacted.";
+        "space in the log. If the " + MAX_COMPACTION_LAG_MS_CONFIG + " or the " + MIN_COMPACTION_LAG_MS_CONFIG +
+        " configurations are also specified, then the log compactor considers the log eligible for compaction " +
+        "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +
+        "records for at least the " + MIN_COMPACTION_LAG_MS_CONFIG + " duration, or (ii) if the log has had " +
+        "dirty (uncompacted) records for at most the " + MAX_COMPACTION_LAG_MS_CONFIG + " period.";
 
     public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
     public static final String CLEANUP_POLICY_COMPACT = "compact";
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 526e174..2f98d3e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -613,11 +613,16 @@ object KafkaConfig {
   val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value " +
   "will allow more log to be cleaned at once but will lead to more hash collisions"
   val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no logs to clean"
-  val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning"
+  val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning. " +
+    "If the " + LogCleanerMaxCompactionLagMsProp + " or the " + LogCleanerMinCompactionLagMsProp +
+    " configurations are also specified, then the log compactor considers the log eligible for compaction " +
+    "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +
+    "records for at least the " + LogCleanerMinCompactionLagMsProp + " duration, or (ii) if the log has had " +
+    "dirty (uncompacted) records for at most the " + LogCleanerMaxCompactionLagMsProp + " period."
   val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
   val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
   val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
-  val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
+  val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted."
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
   val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index"
   val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk "
diff --git a/docs/configuration.html b/docs/configuration.html
index 06d7585..112c844 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -156,6 +156,7 @@
     <li><code>log.index.interval.bytes</code></li>
     <li><code>log.cleaner.delete.retention.ms</code></li>
     <li><code>log.cleaner.min.compaction.lag.ms</code></li>
+    <li><code>log.cleaner.max.compaction.lag.ms</code></li>
     <li><code>log.cleaner.min.cleanable.ratio</code></li>
     <li><code>log.cleanup.policy</code></li>
     <li><code>log.segment.delete.delay.ms</code></li>
diff --git a/docs/design.html b/docs/design.html
index 8c9b2eb..d389030 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -501,6 +501,7 @@
     <ol>
     <li>Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets. The topic's <code>min.compaction.lag.ms</code> can be used to
     guarantee the minimum length of time must pass after a message is written before it could be compacted. I.e. it provides a lower bound on how long each message will remain in the (uncompacted) head.
+    The topic's <code>max.compaction.lag.ms</code> can be used to guarantee the maximum delay between the time a message is written and the time the message becomes eligible for compaction.
     <li>Ordering of messages is always maintained.  Compaction will never re-order messages, just remove some.
     <li>The offset for a message never changes.  It is the permanent identifier for a position in the log.
     <li>Any consumer progressing from the start of the log will see at least the final state of all records in the order they were written.  Additionally, all delete markers for deleted records will be seen, provided
@@ -525,18 +526,25 @@
     The log cleaner is enabled by default. This will start the pool of cleaner threads.
     To enable log cleaning on a particular topic, add the log-specific property
     <pre class="brush: text;"> log.cleanup.policy=compact</pre>
-    
-    The <code>log.cleanup.policy</code> property is a broker configuration setting defined 
-    in the broker's <code>server.properties</code> file; it affects all of the topics 
-    in the cluster that do not have a configuration override in place as documented 
+
+    The <code>log.cleanup.policy</code> property is a broker configuration setting defined
+    in the broker's <code>server.properties</code> file; it affects all of the topics
+    in the cluster that do not have a configuration override in place as documented
     <a href="/documentation.html#brokerconfigs">here</a>.
-    
+
     The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag.
     <pre class="brush: text;">  log.cleaner.min.compaction.lag.ms</pre>
 
     This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently
     being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.
-    
+
+    The log cleaner can be configured to ensure a maximum delay after which the uncompacted "head" of the log becomes eligible for log compaction.
+    <pre class="brush: text;">  log.cleaner.max.compaction.lag.ms</pre>
+
+    This can be used to prevent log with low produce rate from remaining ineligible for compaction for an unbounded duration. If not set, logs that do not exceed min.cleanable.dirty.ratio are not compacted.
+    Note that this compaction deadline is not a hard guarantee since it is still subjected to the availability of log cleaner threads and the actual compaction time.
+    You will want to monitor the uncleanable-partitions-count, max-clean-time-secs and max-compaction-delay-secs metrics.
+
     <p>
     Further cleaner configurations are described <a href="/documentation.html#brokerconfigs">here</a>.