You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/10/09 17:45:51 UTC
[kafka] branch 2.1 updated: KAFKA-7366: Make topic configs
segment.bytes and segment.ms to take effect immediately (#5728)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new acec50e KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)
acec50e is described below
commit acec50e19fb202401bca9d18aac60ba19e647ec4
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Tue Oct 9 23:07:54 2018 +0530
KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)
Reviewers: Ismael Juma <is...@juma.me.uk> and Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/Log.scala | 22 ++++++++++++++++-
core/src/main/scala/kafka/log/LogSegment.scala | 14 +++++------
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 28 ++++++++++------------
core/src/test/scala/unit/kafka/log/LogTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogUtils.scala | 3 +--
.../kafka/server/DynamicConfigChangeTest.scala | 27 +++++++++++++++++++++
6 files changed, 68 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 094473a..bc328d7 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -146,6 +146,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
}
/**
+ * A class used to hold params required to decide to rotate a log segment or not.
+ */
+case class RollParams(maxSegmentMs: Long,
+ maxSegmentBytes: Int,
+ maxTimestampInMessages: Long,
+ maxOffsetInMessages: Long,
+ messagesSize: Int,
+ now: Long)
+
+object RollParams {
+ def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
+ new RollParams(config.segmentMs,
+ config.segmentSize,
+ appendInfo.maxTimestamp,
+ appendInfo.lastOffset,
+ messagesSize, now)
+ }
+}
+
+/**
* An append-only log for storing messages.
*
* The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
@@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File,
val maxTimestampInMessages = appendInfo.maxTimestamp
val maxOffsetInMessages = appendInfo.lastOffset
- if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) {
+ if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " +
s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 80763a8..d910a29 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -45,8 +45,10 @@ import scala.math._
* @param log The file records containing log entries
* @param offsetIndex The offset index
* @param timeIndex The timestamp index
+ * @param txnIndex The transaction index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
+ * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
*/
@nonthreadsafe
@@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
- val maxSegmentMs: Long,
- val maxSegmentBytes: Int,
val time: Time) extends Logging {
- def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
- val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
- size > maxSegmentBytes - messagesSize ||
+ def shouldRoll(rollParams: RollParams): Boolean = {
+ val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
+ size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
- offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
+ offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}
def resizeIndexes(size: Int): Unit = {
@@ -637,8 +637,6 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
- maxSegmentMs = config.segmentMs,
- maxSegmentBytes = config.segmentSize,
time)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 40b6874..353e553 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -38,9 +38,8 @@ class LogSegmentTest {
/* create a segment with the given base offset */
def createSegment(offset: Long,
indexIntervalBytes: Int = 10,
- maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
- val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time)
+ val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, time)
segments += seg
seg
}
@@ -163,10 +162,10 @@ class LogSegmentTest {
val maxSegmentMs = 300000
val time = new MockTime
- val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+ val seg = createSegment(0, time = time)
seg.close()
- val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+ val reopened = createSegment(0, time = time)
assertEquals(0, seg.timeIndex.sizeInBytes)
assertEquals(0, seg.offsetIndex.sizeInBytes)
@@ -176,24 +175,21 @@ class LogSegmentTest {
assertFalse(reopened.timeIndex.isFull)
assertFalse(reopened.offsetIndex.isFull)
- assertFalse(reopened.shouldRoll(messagesSize = 1024,
- maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
- maxOffsetInMessages = 100L,
- now = time.milliseconds()))
+ var rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
+ assertFalse(reopened.shouldRoll(rollParams))
// The segment should not be rolled even if maxSegmentMs has been exceeded
time.sleep(maxSegmentMs + 1)
assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
- assertFalse(reopened.shouldRoll(messagesSize = 1024,
- maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
- maxOffsetInMessages = 100L,
- now = time.milliseconds()))
+ rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
+ assertFalse(reopened.shouldRoll(rollParams))
// But we should still roll the segment if we cannot fit the next offset
- assertTrue(reopened.shouldRoll(messagesSize = 1024,
- maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
- maxOffsetInMessages = Int.MaxValue.toLong + 200,
- now = time.milliseconds()))
+ rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = Int.MaxValue.toLong + 200L, messagesSize = 1024, time.milliseconds())
+ assertTrue(reopened.shouldRoll(rollParams))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 151c4ed..7728998 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -277,7 +277,7 @@ class LogTest {
override def addSegment(segment: LogSegment): LogSegment = {
val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset,
- segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) {
+ segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {
override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
minOneMessage: Boolean): FetchDataInfo = {
diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala b/core/src/test/scala/unit/kafka/log/LogUtils.scala
index eb21895..8652aa5 100644
--- a/core/src/test/scala/unit/kafka/log/LogUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala
@@ -29,13 +29,12 @@ object LogUtils {
def createSegment(offset: Long,
logDir: File,
indexIntervalBytes: Int = 10,
- maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(Log.logFile(logDir, offset))
val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
- new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time)
+ new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 510c4a3..cabe0a9 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
+ @Test
+ def testDynamicTopicConfigChange() {
+ val tp = new TopicPartition("test", 0)
+ val oldSegmentSize = 1000
+ val logProps = new Properties()
+ logProps.put(SegmentBytesProp, oldSegmentSize.toString)
+ createTopic(tp.topic, 1, 1, logProps)
+ TestUtils.retry(10000) {
+ val logOpt = this.servers.head.logManager.getLog(tp)
+ assertTrue(logOpt.isDefined)
+ assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
+ }
+
+ val log = servers.head.logManager.getLog(tp).get
+
+ val newSegmentSize = 2000
+ logProps.put(SegmentBytesProp, newSegmentSize.toString)
+ adminZkClient.changeTopicConfig(tp.topic, logProps)
+ TestUtils.retry(10000) {
+ assertEquals(newSegmentSize, log.config.segmentSize)
+ }
+
+ (1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString))
+ // Verify that the new config is used for all segments
+ assertTrue("Log segment size change not applied", log.logSegments.forall(_.size > 1000))
+ }
+
private def testQuotaConfigChange(user: String, clientId: String, rootEntityType: String, configEntityName: String) {
assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
this.servers.head.dynamicConfigHandlers.contains(rootEntityType))