You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/10/09 17:38:04 UTC

[kafka] branch trunk updated: KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0848b78  KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)
0848b78 is described below

commit 0848b78881afce5899cea1f10c323249f9f0b8cc
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Tue Oct 9 23:07:54 2018 +0530

    KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)
    
    Reviewers: Ismael Juma <is...@juma.me.uk> and Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 22 ++++++++++++++++-
 core/src/main/scala/kafka/log/LogSegment.scala     | 14 +++++------
 .../test/scala/unit/kafka/log/LogSegmentTest.scala | 28 ++++++++++------------
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  2 +-
 core/src/test/scala/unit/kafka/log/LogUtils.scala  |  3 +--
 .../kafka/server/DynamicConfigChangeTest.scala     | 27 +++++++++++++++++++++
 6 files changed, 68 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 094473a..bc328d7 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -146,6 +146,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
 }
 
 /**
+ * A class used to hold params required to decide to rotate a log segment or not.
+ */
+case class RollParams(maxSegmentMs: Long,
+                      maxSegmentBytes: Int,
+                      maxTimestampInMessages: Long,
+                      maxOffsetInMessages: Long,
+                      messagesSize: Int,
+                      now: Long)
+
+object RollParams {
+  def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
+   new RollParams(config.segmentMs,
+     config.segmentSize,
+     appendInfo.maxTimestamp,
+     appendInfo.lastOffset,
+     messagesSize, now)
+  }
+}
+
+/**
  * An append-only log for storing messages.
  *
  * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
@@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File,
     val maxTimestampInMessages = appendInfo.maxTimestamp
     val maxOffsetInMessages = appendInfo.lastOffset
 
-    if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) {
+    if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
       debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " +
         s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
         s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 80763a8..d910a29 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -45,8 +45,10 @@ import scala.math._
  * @param log The file records containing log entries
  * @param offsetIndex The offset index
  * @param timeIndex The timestamp index
+ * @param txnIndex The transaction index
  * @param baseOffset A lower bound on the offsets in this segment
  * @param indexIntervalBytes The approximate number of bytes between entries in the index
+ * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
  * @param time The time instance
  */
 @nonthreadsafe
@@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords,
                                val baseOffset: Long,
                                val indexIntervalBytes: Int,
                                val rollJitterMs: Long,
-                               val maxSegmentMs: Long,
-                               val maxSegmentBytes: Int,
                                val time: Time) extends Logging {
 
-  def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
-    val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
-    size > maxSegmentBytes - messagesSize ||
+  def shouldRoll(rollParams: RollParams): Boolean = {
+    val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
+    size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
       (size > 0 && reachedRollMs) ||
-      offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
+      offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
   }
 
   def resizeIndexes(size: Int): Unit = {
@@ -637,8 +637,6 @@ object LogSegment {
       baseOffset,
       indexIntervalBytes = config.indexInterval,
       rollJitterMs = config.randomSegmentJitter,
-      maxSegmentMs = config.segmentMs,
-      maxSegmentBytes = config.segmentSize,
       time)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 40b6874..353e553 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -38,9 +38,8 @@ class LogSegmentTest {
   /* create a segment with the given base offset */
   def createSegment(offset: Long,
                     indexIntervalBytes: Int = 10,
-                    maxSegmentMs: Int = Int.MaxValue,
                     time: Time = Time.SYSTEM): LogSegment = {
-    val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time)
+    val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, time)
     segments += seg
     seg
   }
@@ -163,10 +162,10 @@ class LogSegmentTest {
 
     val maxSegmentMs = 300000
     val time = new MockTime
-    val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+    val seg = createSegment(0, time = time)
     seg.close()
 
-    val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+    val reopened = createSegment(0, time = time)
     assertEquals(0, seg.timeIndex.sizeInBytes)
     assertEquals(0, seg.offsetIndex.sizeInBytes)
 
@@ -176,24 +175,21 @@ class LogSegmentTest {
     assertFalse(reopened.timeIndex.isFull)
     assertFalse(reopened.offsetIndex.isFull)
 
-    assertFalse(reopened.shouldRoll(messagesSize = 1024,
-      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
-      maxOffsetInMessages = 100L,
-      now = time.milliseconds()))
+    var rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
+    assertFalse(reopened.shouldRoll(rollParams))
 
     // The segment should not be rolled even if maxSegmentMs has been exceeded
     time.sleep(maxSegmentMs + 1)
     assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
-    assertFalse(reopened.shouldRoll(messagesSize = 1024,
-      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
-      maxOffsetInMessages = 100L,
-      now = time.milliseconds()))
+    rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
+    assertFalse(reopened.shouldRoll(rollParams))
 
     // But we should still roll the segment if we cannot fit the next offset
-    assertTrue(reopened.shouldRoll(messagesSize = 1024,
-      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
-      maxOffsetInMessages = Int.MaxValue.toLong + 200,
-      now = time.milliseconds()))
+    rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = Int.MaxValue.toLong + 200L, messagesSize = 1024, time.milliseconds())
+    assertTrue(reopened.shouldRoll(rollParams))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 151c4ed..7728998 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -277,7 +277,7 @@ class LogTest {
 
         override def addSegment(segment: LogSegment): LogSegment = {
           val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset,
-            segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) {
+            segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {
 
             override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
                               minOneMessage: Boolean): FetchDataInfo = {
diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala b/core/src/test/scala/unit/kafka/log/LogUtils.scala
index eb21895..8652aa5 100644
--- a/core/src/test/scala/unit/kafka/log/LogUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala
@@ -29,13 +29,12 @@ object LogUtils {
   def createSegment(offset: Long,
                     logDir: File,
                     indexIntervalBytes: Int = 10,
-                    maxSegmentMs: Int = Int.MaxValue,
                     time: Time = Time.SYSTEM): LogSegment = {
     val ms = FileRecords.open(Log.logFile(logDir, offset))
     val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
     val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
     val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
 
-    new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time)
+    new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 510c4a3..cabe0a9 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     }
   }
 
+  @Test
+  def testDynamicTopicConfigChange() {
+    val tp = new TopicPartition("test", 0)
+    val oldSegmentSize = 1000
+    val logProps = new Properties()
+    logProps.put(SegmentBytesProp, oldSegmentSize.toString)
+    createTopic(tp.topic, 1, 1, logProps)
+    TestUtils.retry(10000) {
+      val logOpt = this.servers.head.logManager.getLog(tp)
+      assertTrue(logOpt.isDefined)
+      assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
+    }
+
+    val log = servers.head.logManager.getLog(tp).get
+
+    val newSegmentSize = 2000
+    logProps.put(SegmentBytesProp, newSegmentSize.toString)
+    adminZkClient.changeTopicConfig(tp.topic, logProps)
+    TestUtils.retry(10000) {
+      assertEquals(newSegmentSize, log.config.segmentSize)
+    }
+
+    (1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString))
+    // Verify that the new config is used for all segments
+    assertTrue("Log segment size change not applied", log.logSegments.forall(_.size > 1000))
+  }
+
   private def testQuotaConfigChange(user: String, clientId: String, rootEntityType: String, configEntityName: String) {
     assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
                this.servers.head.dynamicConfigHandlers.contains(rootEntityType))