You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/03/28 16:59:49 UTC
[1/3] kafka git commit: KAFKA-4586;
Add purgeDataBefore() API (KIP-107)
Repository: kafka
Updated Branches:
refs/heads/trunk f3f9a9eaf -> 8b05ad406
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 1e2749f..c9ea05f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -17,16 +17,21 @@
package kafka.api
import java.util.Collections
+import java.util.concurrent.TimeUnit
import kafka.admin.AdminClient
+import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.server.KafkaConfig
+import java.lang.{Long => JLong}
import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.{Before, Test}
+import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.junit.{After, Before, Test}
import org.junit.Assert._
+import scala.collection.JavaConverters._
class AdminClientTest extends IntegrationTestHarness with Logging {
@@ -58,18 +63,132 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Before
override def setUp() {
- super.setUp
+ super.setUp()
client = AdminClient.createSimplePlaintext(this.brokerList)
TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
}
+ @After
+ override def tearDown() {
+ client.close()
+ super.tearDown()
+ }
+
@Test
- def testListGroups() {
- consumers.head.subscribe(Collections.singletonList(topic))
+ def testSeekToBeginningAfterDeleteRecords() {
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ sendRecords(producers.head, 10, tp)
+ consumer.seekToBeginning(Collections.singletonList(tp))
+ assertEquals(0L, consumer.position(tp))
+
+ client.deleteRecordsBefore(Map((tp, 5L))).get()
+ consumer.seekToBeginning(Collections.singletonList(tp))
+ assertEquals(5L, consumer.position(tp))
+
+ client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+ consumer.seekToBeginning(Collections.singletonList(tp))
+ assertEquals(10L, consumer.position(tp))
+ }
+
+ @Test
+ def testConsumeAfterDeleteRecords() {
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ sendRecords(producers.head, 10, tp)
+ var messageCount = 0
TestUtils.waitUntilTrue(() => {
- consumers.head.poll(0)
- !consumers.head.assignment.isEmpty
- }, "Expected non-empty assignment")
+ messageCount += consumer.poll(0).count()
+ messageCount == 10
+ }, "Expected 10 messages", 3000L)
+
+ client.deleteRecordsBefore(Map((tp, 3L))).get()
+ consumer.seek(tp, 1)
+ messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count()
+ messageCount == 7
+ }, "Expected 7 messages", 3000L)
+
+ client.deleteRecordsBefore(Map((tp, 8L))).get()
+ consumer.seek(tp, 1)
+ messageCount = 0
+ TestUtils.waitUntilTrue(() => {
+ messageCount += consumer.poll(0).count()
+ messageCount == 2
+ }, "Expected 2 messages", 3000L)
+ }
+
+ @Test
+ def testLogStartOffsetCheckpoint() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ sendRecords(producers.head, 10, tp)
+ assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+
+ for (i <- 0 until serverCount)
+ killBroker(i)
+ restartDeadBrokers()
+
+ client.close()
+ brokerList = TestUtils.bootstrapServers(servers, listenerName)
+ client = AdminClient.createSimplePlaintext(brokerList)
+
+ TestUtils.waitUntilTrue(() => {
+ // Need to retry if leader is not available for the partition
+ client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
+ }, "Expected low watermark of the partition to be 5L")
+ }
+
+ @Test
+ def testLogStartOffsetAfterDeleteRecords() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ sendRecords(producers.head, 10, tp)
+ client.deleteRecordsBefore(Map((tp, 3L))).get()
+
+ for (i <- 0 until serverCount)
+ assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
+ }
+
+ @Test
+ def testOffsetsForTimesAfterDeleteRecords() {
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ sendRecords(producers.head, 10, tp)
+ assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+ client.deleteRecordsBefore(Map((tp, 5L))).get()
+ assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+ client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+ assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
+ }
+
+ @Test
+ def testDeleteRecordsWithException() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
+ sendRecords(producers.head, 10, tp)
+ // Should get success result
+ assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+ // OffsetOutOfRangeException if offset > high_watermark
+ assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
+ // TimeoutException if response is not available within user-specified timeout
+ assertEquals(DeleteRecordsResult(-1L, Errors.REQUEST_TIMED_OUT.exception()), client.deleteRecordsBefore(Map((tp, 5L))).get(0, TimeUnit.MILLISECONDS)(tp))
+
+ val nonExistPartition = new TopicPartition(topic, 3)
+ // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
+ assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
+ client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
+ }
+
+ @Test
+ def testListGroups() {
+ subscribeAndWaitForAssignment(topic, consumers.head)
val groups = client.listAllGroupsFlattened
assertFalse(groups.isEmpty)
@@ -80,11 +199,8 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testListAllBrokerVersionInfo() {
- consumers.head.subscribe(Collections.singletonList(topic))
- TestUtils.waitUntilTrue(() => {
- consumers.head.poll(0)
- !consumers.head.assignment.isEmpty
- }, "Expected non-empty assignment")
+ subscribeAndWaitForAssignment(topic, consumers.head)
+
val brokerVersionInfos = client.listAllBrokerVersionInfo
val brokers = brokerList.split(",")
assertEquals(brokers.size, brokerVersionInfos.size)
@@ -98,11 +214,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testGetConsumerGroupSummary() {
- consumers.head.subscribe(Collections.singletonList(topic))
- TestUtils.waitUntilTrue(() => {
- consumers.head.poll(0)
- !consumers.head.assignment.isEmpty
- }, "Expected non-empty assignment")
+ subscribeAndWaitForAssignment(topic, consumers.head)
val group = client.describeConsumerGroup(groupId)
assertEquals("range", group.assignmentStrategy)
@@ -117,11 +229,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testDescribeConsumerGroup() {
- consumers.head.subscribe(Collections.singletonList(topic))
- TestUtils.waitUntilTrue(() => {
- consumers.head.poll(0)
- !consumers.head.assignment.isEmpty
- }, "Expected non-empty assignment")
+ subscribeAndWaitForAssignment(topic, consumers.head)
val consumerGroupSummary = client.describeConsumerGroup(groupId)
assertEquals(1, consumerGroupSummary.consumers.get.size)
@@ -133,4 +241,25 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
val nonExistentGroup = "non" + groupId
assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
}
+
+ private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+ consumer.subscribe(Collections.singletonList(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(0)
+ !consumer.assignment.isEmpty
+ }, "Expected non-empty assignment")
+ }
+
+ private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+ numRecords: Int,
+ tp: TopicPartition) {
+ val futures = (0 until numRecords).map { i =>
+ val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
+ debug(s"Sending this record: $record")
+ producer.send(record)
+ }
+
+ futures.foreach(_.get)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cb4c235..e4cece9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -190,7 +190,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createFetchRequest = {
val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
- partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 100))
+ partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
val version = ApiKeys.FETCH.latestVersion
requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, partitionMap).build()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 4ec77a1..2198bf2 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -251,6 +251,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
restartDeadBrokers()
checkClosedState(dynamicGroup, 0)
checkClosedState(manualGroup, numRecords)
+ adminClient.close()
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 7870485..ae76eb6 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -92,7 +92,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
(tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, null)))
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null)))
}
}
}
@@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
- quotaManagers.follower) {
+ quotaManagers.follower, metadataCache) {
override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
quotaManager: ReplicationQuotaManager) =
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index aff3b2f..0c1297f 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -43,6 +43,7 @@ object StressTestLog {
val log = new Log(dir = dir,
config = LogConfig(logProperties),
+ logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 9c29679..34c6775 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -206,7 +206,7 @@ object TestLinearWriteSpeed {
class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable {
Utils.delete(dir)
- val log = new Log(dir, config, 0L, scheduler, Time.SYSTEM)
+ val log = new Log(dir, config, 0L, 0L, scheduler, Time.SYSTEM)
def write(): Int = {
log.append(messages, true)
messages.sizeInBytes
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 0b1978c..5f97708 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -53,7 +53,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
val logProps = new Properties()
logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
/*configure broker-side compression */
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
/* append two messages */
log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index f2dbc6e..3e91f96 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -324,6 +324,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
val log = new Log(dir = dir,
LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)),
+ logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 4d8c836..05d9060 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -151,6 +151,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
val log = new Log(dir = dir,
LogConfig(logProps),
+ logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 3690c55..94207ec 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -183,6 +183,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val partitionDir = new File(logDir, "log-0")
val log = new Log(partitionDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -190,7 +191,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
}
private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
- new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+ new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
private def records(key: Int, value: Int, timestamp: Long) =
MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 18c1bbe..38eb94c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -775,7 +775,7 @@ class LogCleanerTest extends JUnitSuite {
messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
def makeLog(dir: File = dir, config: LogConfig = logConfig) =
- new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+ new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ }
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c87e927..768c073 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -75,6 +75,7 @@ class LogTest extends JUnitSuite {
// create a log
val log = new Log(logDir,
LogConfig(logProps),
+ logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
@@ -134,6 +135,7 @@ class LogTest extends JUnitSuite {
// create a log
val log = new Log(logDir,
LogConfig(logProps),
+ logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
@@ -165,7 +167,7 @@ class LogTest extends JUnitSuite {
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
// create a log
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@@ -181,7 +183,7 @@ class LogTest extends JUnitSuite {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds))
}
@@ -194,7 +196,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
for(value <- values)
@@ -218,7 +220,7 @@ class LogTest extends JUnitSuite {
def testAppendAndReadWithNonSequentialOffsets() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -243,7 +245,7 @@ class LogTest extends JUnitSuite {
def testReadAtLogGap() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
@@ -260,7 +262,7 @@ class LogTest extends JUnitSuite {
def testReadWithMinMessage() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -288,7 +290,7 @@ class LogTest extends JUnitSuite {
def testReadWithTooSmallMaxLength() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -324,7 +326,7 @@ class LogTest extends JUnitSuite {
// set up replica log starting with offset 1024 and with one message (at offset 1024)
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
log.append(TestUtils.singletonRecords(value = "42".getBytes))
assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@@ -355,7 +357,7 @@ class LogTest extends JUnitSuite {
/* create a multipart log with 100 messages */
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
timestamp = time.milliseconds))
@@ -393,7 +395,7 @@ class LogTest extends JUnitSuite {
/* this log should roll after every messageset */
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
@@ -419,7 +421,7 @@ class LogTest extends JUnitSuite {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
for(i <- 0 until messagesToAppend)
log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10))
@@ -455,7 +457,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
try {
log.append(messageSet)
@@ -482,7 +484,7 @@ class LogTest extends JUnitSuite {
val logProps = new Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
try {
log.append(messageSetWithUnkeyedMessage)
@@ -524,7 +526,7 @@ class LogTest extends JUnitSuite {
val maxMessageSize = second.sizeInBytes - 1
val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
// should be able to append the small message
log.append(first)
@@ -550,7 +552,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
val config = LogConfig(logProps)
- var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
timestamp = time.milliseconds + i * 10))
@@ -576,12 +578,12 @@ class LogTest extends JUnitSuite {
assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
}
- log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
+ log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, time.scheduler, time)
verifyRecoveredLog(log)
log.close()
// test recovery case
- log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
verifyRecoveredLog(log)
log.close()
}
@@ -597,7 +599,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
- val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
val messages = (0 until numMessages).map { i =>
MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@@ -621,7 +623,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
- var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
val indexFiles = log.logSegments.map(_.index.file)
@@ -633,7 +635,7 @@ class LogTest extends JUnitSuite {
timeIndexFiles.foreach(_.delete())
// reopen the log
- log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -660,7 +662,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
val config = LogConfig(logProps)
- var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@@ -670,7 +672,7 @@ class LogTest extends JUnitSuite {
timeIndexFiles.foreach(_.delete())
// The rebuilt time index should be empty
- log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time)
+ log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, time.scheduler, time)
val segArray = log.logSegments.toArray
for (i <- 0 until segArray.size - 1) {
assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@@ -691,7 +693,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
- var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
val indexFiles = log.logSegments.map(_.index.file)
@@ -713,7 +715,7 @@ class LogTest extends JUnitSuite {
}
// reopen the log
- log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
+ log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages) {
assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -739,7 +741,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
// create a log
- val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+ val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (_ <- 1 to msgPerSeg)
@@ -794,7 +796,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
val config = LogConfig(logProps)
- val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+ val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@@ -838,6 +840,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = new Log(logDir,
LogConfig(logProps),
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -869,6 +872,7 @@ class LogTest extends JUnitSuite {
// create a log
var log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -879,6 +883,7 @@ class LogTest extends JUnitSuite {
log.close()
log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -904,6 +909,7 @@ class LogTest extends JUnitSuite {
val log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -944,6 +950,7 @@ class LogTest extends JUnitSuite {
val config = LogConfig(logProps)
var log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -958,6 +965,7 @@ class LogTest extends JUnitSuite {
log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -968,6 +976,7 @@ class LogTest extends JUnitSuite {
def testAppendMessageWithNullPayload() {
val log = new Log(logDir,
LogConfig(),
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -981,6 +990,7 @@ class LogTest extends JUnitSuite {
def testAppendWithOutOfOrderOffsetsThrowsException() {
val log = new Log(logDir,
LogConfig(),
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -994,6 +1004,7 @@ class LogTest extends JUnitSuite {
def testAppendWithNoTimestamp(): Unit = {
val log = new Log(logDir,
LogConfig(),
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -1016,6 +1027,7 @@ class LogTest extends JUnitSuite {
logDir.mkdirs()
var log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -1030,7 +1042,7 @@ class LogTest extends JUnitSuite {
TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
// attempt recovery
- log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
+ log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, time)
assertEquals(numMessages, log.logEndOffset)
val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
@@ -1058,6 +1070,7 @@ class LogTest extends JUnitSuite {
val config = LogConfig(logProps)
val log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -1106,6 +1119,7 @@ class LogTest extends JUnitSuite {
// create a log and write some messages to it
var log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -1116,7 +1130,7 @@ class LogTest extends JUnitSuite {
// check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
// clean shutdown file exists.
recoveryPoint = log.logEndOffset
- log = new Log(logDir, config, 0L, time.scheduler, time)
+ log = new Log(logDir, config, 0L, 0L, time.scheduler, time)
assertEquals(recoveryPoint, log.logEndOffset)
cleanShutdownFile.delete()
}
@@ -1205,6 +1219,7 @@ class LogTest extends JUnitSuite {
val config = LogConfig(logProps)
val log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
@@ -1226,6 +1241,31 @@ class LogTest extends JUnitSuite {
assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
}
+ @Test
+ def testLogDeletionAfterDeleteRecords() {
+ val set = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(set.sizeInBytes)
+
+ for (_ <- 0 until 15)
+ log.append(set)
+ assertEquals("should have 3 segments", 3, log.numberOfSegments)
+ assertEquals(log.logStartOffset, 0)
+
+ log.maybeIncrementLogStartOffset(1)
+ log.deleteOldSegments()
+ assertEquals("should have 3 segments", 3, log.numberOfSegments)
+ assertEquals(log.logStartOffset, 1)
+
+ log.maybeIncrementLogStartOffset(6)
+ log.deleteOldSegments()
+ assertEquals("should have 2 segments", 2, log.numberOfSegments)
+ assertEquals(log.logStartOffset, 6)
+
+ log.maybeIncrementLogStartOffset(15)
+ log.deleteOldSegments()
+ assertEquals("should have 1 segments", 1, log.numberOfSegments)
+ assertEquals(log.logStartOffset, 15)
+ }
@Test
def shouldDeleteSizeBasedSegments() {
@@ -1324,6 +1364,7 @@ class LogTest extends JUnitSuite {
val config = LogConfig(logProps)
val log = new Log(logDir,
config,
+ logStartOffset = 0L,
recoveryPoint = 0L,
time.scheduler,
time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 581a917..3babfc8 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -121,7 +121,7 @@ class AbstractFetcherThreadTest {
fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq
override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
- new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap)
+ new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.fetchOffset) }.toMap)
}
@@ -199,7 +199,7 @@ class AbstractFetcherThreadTest {
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
// Add backoff delay check
if (partitionFetchState.isActive)
- requestMap.put(topicPartition, partitionFetchState.offset)
+ requestMap.put(topicPartition, partitionFetchState.fetchOffset)
}
new DummyFetchRequest(requestMap)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index b350732..eefb35e 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -62,7 +62,7 @@ class FetchRequestTest extends BaseRequestTest {
offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
topicPartitions.foreach { tp =>
- partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), maxPartitionBytes))
+ partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes))
}
partitionMap
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 948b5ec..55cfa27 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -60,7 +60,8 @@ class HighwatermarkPersistenceTest {
val time = new MockTime
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
- logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+ logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
+ new MetadataCache(configs.head.brokerId))
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()
@@ -104,7 +105,8 @@ class HighwatermarkPersistenceTest {
val time = new MockTime
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
- scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+ scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
+ new MetadataCache(configs.head.brokerId))
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 348bfc3..3898d2b 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -53,7 +53,7 @@ class IsrExpirationTest {
@Before
def setUp() {
replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false),
- QuotaFactory.instantiate(configs.head, metrics, time).follower)
+ QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
}
@After
@@ -78,7 +78,9 @@ class IsrExpirationTest {
for (replica <- partition0.assignedReplicas - leaderReplica)
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
hw = 15L,
+ leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
+ followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1))
var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
@@ -130,7 +132,9 @@ class IsrExpirationTest {
for (replica <- partition0.assignedReplicas - leaderReplica)
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY),
hw = 10L,
+ leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
+ followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1))
@@ -144,7 +148,9 @@ class IsrExpirationTest {
(partition0.assignedReplicas - leaderReplica).foreach { r =>
r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY),
hw = 11L,
+ leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
+ followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1))
}
@@ -161,7 +167,9 @@ class IsrExpirationTest {
(partition0.assignedReplicas - leaderReplica).foreach { r =>
r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
hw = 15L,
+ leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
+ followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1))
}
@@ -185,7 +193,9 @@ class IsrExpirationTest {
for (replica <- partition.assignedReplicas - leaderReplica)
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY),
hw = 0L,
+ leaderLogStartOffset = 0L,
leaderLogEndOffset = 0L,
+ followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1))
// set the leader and its hw and the hw update time
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c89e626..c27f8c5 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -587,6 +587,7 @@ class KafkaConfigTest {
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
@@ -603,6 +604,7 @@ class KafkaConfigTest {
case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+ case KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9386a1d..f65884e 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import java.io.File
import java.util.concurrent.atomic.AtomicLong
import java.util.{Properties, Random}
+import java.lang.{Long => JLong}
import kafka.admin.AdminUtils
import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
@@ -31,10 +32,13 @@ import kafka.utils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
import org.apache.kafka.common.utils.{Time, Utils}
import org.easymock.{EasyMock, IAnswer}
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import scala.collection.JavaConverters._
class LogOffsetTest extends ZooKeeperTestHarness {
val random = new Random()
@@ -76,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
}
@Test
- def testGetOffsetsBeforeLatestTime() {
+ def testGetOffsetsAfterDeleteRecords() {
val topicPartition = "kafka-" + 0
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
@@ -93,6 +97,40 @@ class LogOffsetTest extends ZooKeeperTestHarness {
log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
log.flush()
+ log.maybeIncrementLogStartOffset(3)
+ log.deleteOldSegments()
+
+ val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+ assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets)
+
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
+ replicaId = 0)
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
+ }
+
+ @Test
+ def testGetOffsetsBeforeLatestTime() {
+ val topicPartition = "kafka-" + 0
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ AdminUtils.createTopic(zkUtils, topic, 1, 1)
+
+ val logManager = server.getLogManager
+ waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
+ "Log for partition [topic,0] should be created")
+ val log = logManager.getLog(new TopicPartition(topic, part)).get
+
+ for (_ <- 0 until 20)
+ log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
+ log.flush()
+
val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f856e52..a720a6a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -40,7 +40,7 @@ class ReplicaManagerQuotasTest {
val record = new SimpleRecord("some-data-in-a-message".getBytes())
val topicPartition1 = new TopicPartition("test-topic", 1)
val topicPartition2 = new TopicPartition("test-topic", 2)
- val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 100), topicPartition2 -> new PartitionData(0, 100))
+ val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 0, 100), topicPartition2 -> new PartitionData(0, 0, 100))
var replicaManager: ReplicaManager = null
@Test
@@ -147,6 +147,7 @@ class ReplicaManagerQuotasTest {
//Create log which handles both a regular read and a 0 bytes read
val log = createMock(classOf[Log])
+ expect(log.logStartOffset).andReturn(0L).anyTimes()
expect(log.logEndOffset).andReturn(20L).anyTimes()
expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
@@ -173,7 +174,7 @@ class ReplicaManagerQuotasTest {
replay(logManager)
replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
- new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+ new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
//create the two replicas
for ((p, _) <- fetchInfo) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d48e4f2..e00c142 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -65,7 +65,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
try {
val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
partition.getOrCreateReplica(1)
@@ -83,7 +83,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
try {
val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
partition.getOrCreateReplica(1)
@@ -100,7 +100,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId), Option(this.getClass.getName))
try {
def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@@ -127,8 +127,12 @@ class ReplicaManagerTest {
val logProps = new Properties()
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+ val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
+ val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+ EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+ EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache)
try {
var produceCallbackFired = false
@@ -145,11 +149,6 @@ class ReplicaManagerTest {
fetchCallbackFired = true
}
- val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
- val metadataCache = EasyMock.createMock(classOf[MetadataCache])
- EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
- EasyMock.replay(metadataCache)
-
val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
@@ -159,7 +158,7 @@ class ReplicaManagerTest {
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+ rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
// Append a message.
@@ -178,14 +177,14 @@ class ReplicaManagerTest {
fetchMinBytes = 100000,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
- fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 100000)),
+ fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
responseCallback = fetchCallback)
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {})
+ rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => {})
assertTrue(produceCallbackFired)
assertTrue(fetchCallbackFired)
@@ -203,13 +202,16 @@ class ReplicaManagerTest {
val logProps = new Properties()
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+ val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2))
+ val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+ EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+ EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
+ EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
+ EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes()
+ EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
try {
- val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2))
- val metadataCache = EasyMock.createMock(classOf[MetadataCache])
- EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
- EasyMock.replay(metadataCache)
val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
@@ -221,7 +223,7 @@ class ReplicaManagerTest {
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
- rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+ rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
@@ -251,7 +253,7 @@ class ReplicaManagerTest {
fetchMinBytes = 0,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
- fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
+ fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
responseCallback = fetchCallback)
@@ -267,7 +269,7 @@ class ReplicaManagerTest {
fetchMinBytes = 0,
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
- fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
+ fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
responseCallback = fetchCallback)
assertTrue(fetchCallbackFired)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index f33c73a..0129d5d 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -59,7 +59,7 @@ class SimpleFetchTest {
val partitionId = 0
val topicPartition = new TopicPartition(topic, partitionId)
- val fetchInfo = Seq(topicPartition -> new PartitionData(0, fetchSize))
+ val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize))
var replicaManager: ReplicaManager = null
@@ -75,6 +75,7 @@ class SimpleFetchTest {
// create the log which takes read with either HW max offset or none max offset
val log = EasyMock.createMock(classOf[Log])
+ EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
@@ -96,7 +97,7 @@ class SimpleFetchTest {
// create the replica manager
replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
- new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+ new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
// add the partition with two replicas, both in ISR
val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
@@ -111,7 +112,9 @@ class SimpleFetchTest {
val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY),
hw = leo.messageOffset,
+ leaderLogStartOffset = 0L,
leaderLogEndOffset = leo.messageOffset,
+ followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1))
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1ffc7c3..9ae7195 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -934,7 +934,8 @@ object TestUtils extends Logging {
cleanerConfig = cleanerConfig,
ioThreads = 4,
flushCheckMs = 1000L,
- flushCheckpointMs = 10000L,
+ flushRecoveryOffsetCheckpointMs = 10000L,
+ flushStartOffsetCheckpointMs = 10000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
time = time,
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ec29da7..109097c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -65,6 +65,12 @@
producer's <code>batch.size</code> configuration.</li>
</ul>
+<h5><a id="upgrade_1100_new_protocols" href="#upgrade_1100_new_protocols">New Protocol Versions</a></h5>
+<ul>
+ <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore()+API+in+AdminClient">KIP-107</a>: FetchRequest v5 introduces a partition-level <code>log_start_offset</code> field. </li>
+ <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore()+API+in+AdminClient">KIP-107</a>: FetchResponse v5 introduces a partition-level <code>log_start_offset</code> field. </li>
+</ul>
+
<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
<p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please review the <a href="#upgrade_1020_notable">notable changes in 0.10.2.0</a> before upgrading.
[3/3] kafka git commit: KAFKA-4586;
Add purgeDataBefore() API (KIP-107)
Posted by jq...@apache.org.
KAFKA-4586; Add purgeDataBefore() API (KIP-107)
Author: Dong Lin <li...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jiangjie Qin <be...@gmail.com>
Closes #2476 from lindong28/KAFKA-4586
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b05ad40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b05ad40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b05ad40
Branch: refs/heads/trunk
Commit: 8b05ad406d4cba6a75d1683b6d8699c3ab28f9d6
Parents: f3f9a9e
Author: Dong Lin <li...@gmail.com>
Authored: Tue Mar 28 09:59:44 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Mar 28 09:59:44 2017 -0700
----------------------------------------------------------------------
bin/kafka-delete-records.sh | 17 ++
.../clients/consumer/internals/Fetcher.java | 4 +-
.../consumer/internals/RequestFuture.java | 36 ++--
.../kafka/common/network/DualSocketChannel.java | 4 +
.../apache/kafka/common/protocol/ApiKeys.java | 3 +-
.../apache/kafka/common/protocol/Protocol.java | 112 +++++++++++-
.../apache/kafka/common/record/FileRecords.java | 15 +-
.../kafka/common/requests/AbstractRequest.java | 3 +
.../kafka/common/requests/AbstractResponse.java | 2 +
.../common/requests/DeleteRecordsRequest.java | 151 +++++++++++++++
.../common/requests/DeleteRecordsResponse.java | 135 ++++++++++++++
.../kafka/common/requests/FetchRequest.java | 30 +--
.../kafka/common/requests/FetchResponse.java | 35 ++--
.../clients/consumer/KafkaConsumerTest.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 4 +-
.../common/requests/RequestResponseTest.java | 16 +-
.../main/scala/kafka/admin/AdminClient.scala | 183 +++++++++++++++++--
.../kafka/admin/BrokerApiVersionsCommand.scala | 1 +
.../kafka/admin/DeleteRecordsCommand.scala | 117 ++++++++++++
core/src/main/scala/kafka/api/ApiVersion.scala | 10 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 2 +-
.../main/scala/kafka/cluster/Partition.scala | 58 +++++-
core/src/main/scala/kafka/cluster/Replica.scala | 34 +++-
.../kafka/consumer/ConsumerFetcherThread.scala | 3 +-
core/src/main/scala/kafka/log/Log.scala | 98 +++++++---
core/src/main/scala/kafka/log/LogManager.scala | 56 +++++-
core/src/main/scala/kafka/log/LogSegment.scala | 33 ++--
.../kafka/server/AbstractFetcherThread.scala | 26 +--
.../kafka/server/DelayedDeleteRecords.scala | 129 +++++++++++++
.../main/scala/kafka/server/DelayedFetch.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 67 ++++++-
.../main/scala/kafka/server/KafkaConfig.scala | 10 +
.../main/scala/kafka/server/KafkaServer.scala | 5 +-
.../main/scala/kafka/server/MetadataCache.scala | 6 +
.../kafka/server/ReplicaFetcherThread.scala | 15 +-
.../scala/kafka/server/ReplicaManager.scala | 155 ++++++++++++++--
.../integration/kafka/api/AdminClientTest.scala | 179 +++++++++++++++---
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../kafka/api/ConsumerBounceTest.scala | 1 +
.../ReplicaFetcherThreadFatalErrorTest.scala | 4 +-
.../test/scala/other/kafka/StressTestLog.scala | 1 +
.../other/kafka/TestLinearWriteSpeed.scala | 2 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 2 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 1 +
.../log/LogCleanerLagIntegrationTest.scala | 1 +
.../unit/kafka/log/LogCleanerManagerTest.scala | 3 +-
.../scala/unit/kafka/log/LogCleanerTest.scala | 2 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 97 +++++++---
.../server/AbstractFetcherThreadTest.scala | 4 +-
.../unit/kafka/server/FetchRequestTest.scala | 2 +-
.../server/HighwatermarkPersistenceTest.scala | 6 +-
.../unit/kafka/server/ISRExpirationTest.scala | 12 +-
.../unit/kafka/server/KafkaConfigTest.scala | 2 +
.../scala/unit/kafka/server/LogOffsetTest.scala | 40 +++-
.../kafka/server/ReplicaManagerQuotasTest.scala | 5 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 42 +++--
.../unit/kafka/server/SimpleFetchTest.scala | 7 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +-
docs/upgrade.html | 6 +
59 files changed, 1732 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/bin/kafka-delete-records.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-delete-records.sh b/bin/kafka-delete-records.sh
new file mode 100755
index 0000000..8726f91
--- /dev/null
+++ b/bin/kafka-delete-records.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7236653..c2456cc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -200,7 +200,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
- long fetchOffset = request.fetchData().get(partition).offset;
+ long fetchOffset = request.fetchData().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
@@ -722,7 +722,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
}
long position = this.subscriptions.position(partition);
- fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
+ fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize));
log.trace("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
} else {
log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 2b7c8f3..8515c95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -18,7 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.protocol.Errors;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
@@ -46,6 +47,7 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
private static final Object INCOMPLETE_SENTINEL = new Object();
private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
+ private final CountDownLatch completedLatch = new CountDownLatch(1);
/**
* Check whether the response is ready to be handled
@@ -55,6 +57,10 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
return result.get() != INCOMPLETE_SENTINEL;
}
+ public boolean awaitDone(long timeout, TimeUnit unit) throws InterruptedException {
+ return completedLatch.await(timeout, unit);
+ }
+
/**
* Get the value corresponding to this request (only available if the request succeeded)
* @return the value set in {@link #complete(Object)}
@@ -112,12 +118,16 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
* @throws IllegalArgumentException if the argument is an instance of {@link RuntimeException}
*/
public void complete(T value) {
- if (value instanceof RuntimeException)
- throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
+ try {
+ if (value instanceof RuntimeException)
+ throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
- if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
- throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
- fireSuccess();
+ if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
+ throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
+ fireSuccess();
+ } finally {
+ completedLatch.countDown();
+ }
}
/**
@@ -127,13 +137,17 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
* @throws IllegalStateException if the future has already been completed
*/
public void raise(RuntimeException e) {
- if (e == null)
- throw new IllegalArgumentException("The exception passed to raise must not be null");
+ try {
+ if (e == null)
+ throw new IllegalArgumentException("The exception passed to raise must not be null");
- if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
- throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
+ if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
+ throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
- fireFailure();
+ fireFailure();
+ } finally {
+ completedLatch.countDown();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java b/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
new file mode 100644
index 0000000..411dd50
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
@@ -0,0 +1,4 @@
+package org.apache.kafka.common.network;
+
+public class DualSocketChannel {
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b3c59a1..89b2000 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -45,7 +45,8 @@ public enum ApiKeys {
SASL_HANDSHAKE(17, "SaslHandshake"),
API_VERSIONS(18, "ApiVersions"),
CREATE_TOPICS(19, "CreateTopics"),
- DELETE_TOPICS(20, "DeleteTopics");
+ DELETE_TOPICS(20, "DeleteTopics"),
+ DELETE_RECORDS(21, "DeleteRecords");
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 8c3e08c..5d7004a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -492,11 +492,31 @@ public class Protocol {
INT32,
"Maximum bytes to fetch."));
+ // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+ public static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("fetch_offset",
+ INT64,
+ "Message offset."),
+ new Field("log_start_offset",
+ INT64,
+ "Earliest available offset of the follower replica. " +
+ "The field is only used when request is sent by follower. "),
+ new Field("max_bytes",
+ INT32,
+ "Maximum bytes to fetch."));
+
public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
new Field("partitions",
new ArrayOf(FETCH_REQUEST_PARTITION_V0),
"Partitions to fetch."));
+ public static final Schema FETCH_REQUEST_TOPIC_V5 = new Schema(new Field("topic", STRING, "Topic to fetch."),
+ new Field("partitions",
+ new ArrayOf(FETCH_REQUEST_PARTITION_V5),
+ "Partitions to fetch."));
+
public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
INT32,
"Broker id of the follower. For normal consumers, use -1."),
@@ -565,6 +585,34 @@ public class Protocol {
new ArrayOf(FETCH_REQUEST_TOPIC_V0),
"Topics to fetch in the order provided."));
+ // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed.
+ public static final Schema FETCH_REQUEST_V5 = new Schema(
+ new Field("replica_id",
+ INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("max_wait_time",
+ INT32,
+ "Maximum time in ms to wait for the response."),
+ new Field("min_bytes",
+ INT32,
+ "Minimum bytes to accumulate in the response."),
+ new Field("max_bytes",
+ INT32,
+ "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+ "if the first message in the first non-empty partition of the fetch is larger than this " +
+ "value, the message will still be returned to ensure that progress can be made."),
+ new Field("isolation_level",
+ INT8,
+ "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+ "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+ "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+ "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+ "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+ "consumers to discard ABORTED transactional records"),
+ new Field("topics",
+ new ArrayOf(FETCH_REQUEST_TOPIC_V5),
+ "Topics to fetch in the order provided."));
+
public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
@@ -602,6 +650,8 @@ public class Protocol {
new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
new Field("first_offset", INT64, "The first offset in the aborted transaction"));
+ public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
+
public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
new Field("partition",
INT32,
@@ -617,14 +667,41 @@ public class Protocol {
new Field("aborted_transactions",
ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
+ // FETCH_RESPONSE_PARTITION_HEADER_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+ public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema(
+ new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code", INT16),
+ new Field("high_watermark",
+ INT64,
+ "Last committed offset."),
+ new Field("last_stable_offset",
+ INT64,
+ "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
+ "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
+ new Field("log_start_offset",
+ INT64,
+ "Earliest available offset."),
+ new Field("aborted_transactions",
+ ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V5)));
+
public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
new Field("record_set", RECORDS));
+ public static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema(
+ new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V5),
+ new Field("record_set", RECORDS));
+
public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
new Field("topic", STRING),
new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
+ public static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema(
+ new Field("topic", STRING),
+ new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
+
public static final Schema FETCH_RESPONSE_V4 = new Schema(
new Field("throttle_time_ms",
INT32,
@@ -633,8 +710,16 @@ public class Protocol {
0),
new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
- public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4};
- public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4};
+ public static final Schema FETCH_RESPONSE_V5 = new Schema(
+ new Field("throttle_time_ms",
+ INT32,
+ "Duration in milliseconds for which the request was throttled " +
+ "due to quota violation (zero if the request did not violate any quota).",
+ 0),
+ new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
+
+ public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
+ public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
/* List groups api */
public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
@@ -1070,6 +1155,27 @@ public class Protocol {
public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0};
public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0};
+ public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
+ new Field("offset", INT64, "The offset before which the messages will be deleted."));
+
+ public static final Schema DELETE_RECORDS_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+ new Field("partitions", new ArrayOf(DELETE_RECORDS_REQUEST_PARTITION_V0)));
+
+ public static final Schema DELETE_RECORDS_REQUEST_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_REQUEST_TOPIC_V0)),
+ new Field("timeout", INT32, "The maximum time to await a response in ms."));
+
+ public static final Schema DELETE_RECORDS_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
+ new Field("low_watermark", INT64, "Smallest available offset of all live replicas"),
+ new Field("error_code", INT16, "The error code for the given partition."));
+
+ public static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+ new Field("partitions", new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0)));
+
+ public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
+ public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
+
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1101,6 +1207,7 @@ public class Protocol {
REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST;
REQUESTS[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_REQUEST;
REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
+ REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1123,6 +1230,7 @@ public class Protocol {
RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE;
RESPONSES[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_RESPONSE;
RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
+ RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
/* set the minimum and maximum version of each api */
for (ApiKeys api : ApiKeys.values()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index b0dcebf..dcd7845 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -286,24 +286,25 @@ public class FileRecords extends AbstractRecords implements Closeable {
}
/**
- * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+ * Search forward for the first message that meets the following requirements:
+ * - Message's timestamp is greater than or equals to the targetTimestamp.
+ * - Message's position in the log file is greater than or equals to the startingPosition.
+ * - Message's offset is greater than or equals to the startingOffset.
*
* @param targetTimestamp The timestamp to search for.
* @param startingPosition The starting position to search.
- * @return The timestamp and offset of the message found. None, if no message is found.
+ * @param startingOffset The starting offset to search.
+ * @return The timestamp and offset of the message found. Null if no message is found.
*/
- public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
+ public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition, long startingOffset) {
for (RecordBatch batch : batchesFrom(startingPosition)) {
if (batch.maxTimestamp() >= targetTimestamp) {
// We found a message
for (Record record : batch) {
long timestamp = record.timestamp();
- if (timestamp >= targetTimestamp)
+ if (timestamp >= targetTimestamp && record.offset() >= startingOffset)
return new TimestampAndOffset(timestamp, record.offset());
}
- throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s)" +
- " should contain target timestamp %s but it does not.", batch.maxTimestamp(),
- batch.lastOffset(), targetTimestamp));
}
}
return null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 7dc3b62..3a99a8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -168,6 +168,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
case DELETE_TOPICS:
request = new DeleteTopicsRequest(struct, version);
break;
+ case DELETE_RECORDS:
+ request = new DeleteRecordsRequest(struct, version);
+ break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index d534daf..a5d0dc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -91,6 +91,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new CreateTopicsResponse(struct);
case DELETE_TOPICS:
return new DeleteTopicsResponse(struct);
+ case DELETE_RECORDS:
+ return new DeleteRecordsResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
new file mode 100644
index 0000000..f204c44
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteRecordsRequest extends AbstractRequest {
+
+ public static final long HIGH_WATERMARK = -1L;
+
+ // request level key names
+ private static final String TOPICS_KEY_NAME = "topics";
+ private static final String TIMEOUT_KEY_NAME = "timeout";
+
+ // topic level key names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level key names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String OFFSET_KEY_NAME = "offset";
+
+ private final int timeout;
+ private final Map<TopicPartition, Long> partitionOffsets;
+
+ public static class Builder extends AbstractRequest.Builder<DeleteRecordsRequest> {
+ private final int timeout;
+ private final Map<TopicPartition, Long> partitionOffsets;
+
+ public Builder(int timeout, Map<TopicPartition, Long> partitionOffsets) {
+ super(ApiKeys.DELETE_RECORDS);
+ this.timeout = timeout;
+ this.partitionOffsets = partitionOffsets;
+ }
+
+ @Override
+ public DeleteRecordsRequest build(short version) {
+ return new DeleteRecordsRequest(timeout, partitionOffsets, version);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("(type=DeleteRecordsRequest")
+ .append(", timeout=").append(timeout)
+ .append(", partitionOffsets=(").append(Utils.mkString(partitionOffsets))
+ .append("))");
+ return builder.toString();
+ }
+ }
+
+
+ public DeleteRecordsRequest(Struct struct, short version) {
+ super(version);
+ partitionOffsets = new HashMap<>();
+ for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicStruct = (Struct) topicStructObj;
+ String topic = topicStruct.getString(TOPIC_KEY_NAME);
+ for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionStruct = (Struct) partitionStructObj;
+ int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+ long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
+ partitionOffsets.put(new TopicPartition(topic, partition), offset);
+ }
+ }
+ timeout = struct.getInt(TIMEOUT_KEY_NAME);
+ }
+
+ public DeleteRecordsRequest(int timeout, Map<TopicPartition, Long> partitionOffsets, short version) {
+ super(version);
+ this.timeout = timeout;
+ this.partitionOffsets = partitionOffsets;
+ }
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.DELETE_RECORDS.requestSchema(version()));
+ Map<String, Map<Integer, Long>> offsetsByTopic = CollectionUtils.groupDataByTopic(partitionOffsets);
+ struct.set(TIMEOUT_KEY_NAME, timeout);
+ List<Struct> topicStructArray = new ArrayList<>();
+ for (Map.Entry<String, Map<Integer, Long>> offsetsByTopicEntry : offsetsByTopic.entrySet()) {
+ Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+ topicStruct.set(TOPIC_KEY_NAME, offsetsByTopicEntry.getKey());
+ List<Struct> partitionStructArray = new ArrayList<>();
+ for (Map.Entry<Integer, Long> offsetsByPartitionEntry : offsetsByTopicEntry.getValue().entrySet()) {
+ Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+ partitionStruct.set(PARTITION_KEY_NAME, offsetsByPartitionEntry.getKey());
+ partitionStruct.set(OFFSET_KEY_NAME, offsetsByPartitionEntry.getValue());
+ partitionStructArray.add(partitionStruct);
+ }
+ topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+ topicStructArray.add(topicStruct);
+ }
+ struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(Throwable e) {
+ Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> responseMap = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+ responseMap.put(entry.getKey(), new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.forException(e)));
+ }
+
+ short versionId = version();
+ switch (versionId) {
+ case 0:
+ return new DeleteRecordsResponse(responseMap);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion()));
+ }
+ }
+
+ public int timeout() {
+ return timeout;
+ }
+
+ public Map<TopicPartition, Long> partitionOffsets() {
+ return partitionOffsets;
+ }
+
+ public static DeleteRecordsRequest parse(ByteBuffer buffer, short version) {
+ return new DeleteRecordsRequest(ApiKeys.DELETE_RECORDS.parseRequest(version, buffer), version);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
new file mode 100644
index 0000000..45b518b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteRecordsResponse extends AbstractResponse {
+
+ public static final long INVALID_LOW_WATERMARK = -1L;
+
+ // request level key names
+ private static final String TOPICS_KEY_NAME = "topics";
+
+ // topic level key names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level key names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String LOW_WATERMARK_KEY_NAME = "low_watermark";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ private final Map<TopicPartition, PartitionResponse> responses;
+
+ /**
+ * Possible error code:
+ *
+ * OFFSET_OUT_OF_RANGE (1)
+ * UNKNOWN_TOPIC_OR_PARTITION (3)
+ * NOT_LEADER_FOR_PARTITION (6)
+ * REQUEST_TIMED_OUT (7)
+ * NOT_ENOUGH_REPLICAS (19)
+ * UNKNOWN (-1)
+ */
+
+ public static final class PartitionResponse {
+ public long lowWatermark;
+ public Errors error;
+
+ public PartitionResponse(long lowWatermark, Errors error) {
+ this.lowWatermark = lowWatermark;
+ this.error = error;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append('{')
+ .append(",low_watermark: ")
+ .append(lowWatermark)
+ .append("error: ")
+ .append(error.toString())
+ .append('}');
+ return builder.toString();
+ }
+ }
+
+ public DeleteRecordsResponse(Struct struct) {
+ responses = new HashMap<>();
+ for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicStruct = (Struct) topicStructObj;
+ String topic = topicStruct.getString(TOPIC_KEY_NAME);
+ for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionStruct = (Struct) partitionStructObj;
+ int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+ long lowWatermark = partitionStruct.getLong(LOW_WATERMARK_KEY_NAME);
+ Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+ responses.put(new TopicPartition(topic, partition), new PartitionResponse(lowWatermark, error));
+ }
+ }
+ }
+
+ /**
+ * Constructor for version 0.
+ */
+ public DeleteRecordsResponse(Map<TopicPartition, PartitionResponse> responses) {
+ this.responses = responses;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version));
+ Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
+ List<Struct> topicStructArray = new ArrayList<>();
+ for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) {
+ Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+ topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+ List<Struct> partitionStructArray = new ArrayList<>();
+ for (Map.Entry<Integer, PartitionResponse> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
+ Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+ PartitionResponse response = responsesByPartitionEntry.getValue();
+ partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey());
+ partitionStruct.set(LOW_WATERMARK_KEY_NAME, response.lowWatermark);
+ partitionStruct.set(ERROR_CODE_KEY_NAME, response.error.code());
+ partitionStructArray.add(partitionStruct);
+ }
+ topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+ topicStructArray.add(topicStruct);
+ }
+ struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+ return struct;
+ }
+
+ public Map<TopicPartition, PartitionResponse> responses() {
+ return this.responses;
+ }
+
+ public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
+ return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 7c029ca..b843c66 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -47,9 +47,11 @@ public class FetchRequest extends AbstractRequest {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+ private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
// default values for older versions where a request level limit did not exist
public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
+ public static final long INVALID_LOG_START_OFFSET = -1L;
private final int replicaId;
private final int maxWait;
@@ -59,17 +61,19 @@ public class FetchRequest extends AbstractRequest {
private final LinkedHashMap<TopicPartition, PartitionData> fetchData;
public static final class PartitionData {
- public final long offset;
+ public final long fetchOffset;
+ public final long logStartOffset;
public final int maxBytes;
- public PartitionData(long offset, int maxBytes) {
- this.offset = offset;
+ public PartitionData(long fetchOffset, long logStartOffset, int maxBytes) {
+ this.fetchOffset = fetchOffset;
+ this.logStartOffset = logStartOffset;
this.maxBytes = maxBytes;
}
@Override
public String toString() {
- return "(offset=" + offset + ", maxBytes=" + maxBytes + ")";
+ return "(offset=" + fetchOffset + ", logStartOffset=" + logStartOffset + ", maxBytes=" + maxBytes + ")";
}
}
@@ -188,7 +192,9 @@ public class FetchRequest extends AbstractRequest {
int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
- PartitionData partitionData = new PartitionData(offset, maxBytes);
+ long logStartOffset = partitionResponse.hasField(LOG_START_OFFSET_KEY_NAME) ?
+ partitionResponse.getLong(LOG_START_OFFSET_KEY_NAME) : INVALID_LOG_START_OFFSET;
+ PartitionData partitionData = new PartitionData(offset, logStartOffset, maxBytes);
fetchData.put(new TopicPartition(topic, partition), partitionData);
}
}
@@ -200,7 +206,8 @@ public class FetchRequest extends AbstractRequest {
for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e),
- FetchResponse.INVALID_LSO, FetchResponse.INVALID_HIGHWATERMARK, null, MemoryRecords.EMPTY);
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET,
+ null, MemoryRecords.EMPTY);
responseData.put(entry.getKey(), partitionResponse);
}
return new FetchResponse(responseData, 0);
@@ -240,16 +247,15 @@ public class FetchRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
- short version = version();
- Struct struct = new Struct(ApiKeys.FETCH.requestSchema(version));
+ Struct struct = new Struct(ApiKeys.FETCH.requestSchema(version()));
List<TopicAndPartitionData<PartitionData>> topicsData = TopicAndPartitionData.batchByTopic(fetchData);
struct.set(REPLICA_ID_KEY_NAME, replicaId);
struct.set(MAX_WAIT_KEY_NAME, maxWait);
struct.set(MIN_BYTES_KEY_NAME, minBytes);
- if (version >= 3)
+ if (struct.hasField(MAX_BYTES_KEY_NAME))
struct.set(MAX_BYTES_KEY_NAME, maxBytes);
- if (version >= 4)
+ if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
struct.set(ISOLATION_LEVEL_KEY_NAME, IsolationLevel.READ_UNCOMMITTED.id());
List<Struct> topicArray = new ArrayList<>();
@@ -261,7 +267,9 @@ public class FetchRequest extends AbstractRequest {
PartitionData fetchPartitionData = partitionEntry.getValue();
Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
- partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
+ partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.fetchOffset);
+ if (partitionData.hasField(LOG_START_OFFSET_KEY_NAME))
+ partitionData.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset);
partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
partitionArray.add(partitionData);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f0a0eee..56eb838 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -51,6 +51,7 @@ public class FetchResponse extends AbstractResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
private static final String LAST_STABLE_OFFSET_KEY_NAME = "last_stable_offset";
+ private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
private static final String ABORTED_TRANSACTIONS_KEY_NAME = "aborted_transactions";
private static final String RECORD_SET_KEY_NAME = "record_set";
@@ -60,7 +61,8 @@ public class FetchResponse extends AbstractResponse {
private static final int DEFAULT_THROTTLE_TIME = 0;
public static final long INVALID_HIGHWATERMARK = -1L;
- public static final long INVALID_LSO = -1L;
+ public static final long INVALID_LAST_STABLE_OFFSET = -1L;
+ public static final long INVALID_LOG_START_OFFSET = -1L;
/**
* Possible error codes:
@@ -111,19 +113,22 @@ public class FetchResponse extends AbstractResponse {
public static final class PartitionData {
public final Errors error;
- public final long lastStableOffset;
public final long highWatermark;
+ public final long lastStableOffset;
+ public final long logStartOffset;
public final List<AbortedTransaction> abortedTransactions;
public final Records records;
public PartitionData(Errors error,
long highWatermark,
long lastStableOffset,
+ long logStartOffset,
List<AbortedTransaction> abortedTransactions,
Records records) {
this.error = error;
this.highWatermark = highWatermark;
this.lastStableOffset = lastStableOffset;
+ this.logStartOffset = logStartOffset;
this.abortedTransactions = abortedTransactions;
this.records = records;
}
@@ -140,6 +145,7 @@ public class FetchResponse extends AbstractResponse {
return error == that.error &&
highWatermark == that.highWatermark &&
lastStableOffset == that.lastStableOffset &&
+ logStartOffset == that.logStartOffset &&
(abortedTransactions == null ? that.abortedTransactions == null : abortedTransactions.equals(that.abortedTransactions)) &&
(records == null ? that.records == null : records.equals(that.records));
}
@@ -147,8 +153,9 @@ public class FetchResponse extends AbstractResponse {
@Override
public int hashCode() {
int result = error != null ? error.hashCode() : 0;
- result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
result = 31 * result + (int) (highWatermark ^ (highWatermark >>> 32));
+ result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
+ result = 31 * result + (int) (logStartOffset ^ (logStartOffset >>> 32));
result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
result = 31 * result + (records != null ? records.hashCode() : 0);
return result;
@@ -157,15 +164,16 @@ public class FetchResponse extends AbstractResponse {
@Override
public String toString() {
return "(error=" + error + ", highWaterMark=" + highWatermark +
- ", lastStableOffset = " + lastStableOffset + ", " +
- "abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
+ ", lastStableOffset = " + lastStableOffset +
+ ", logStartOffset = " + logStartOffset +
+ ", abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
}
}
/**
* Constructor for all versions.
*
- * From version 3, the entries in `responseData` should be in the same order as the entries in
+ * From version 3 or later, the entries in `responseData` should be in the same order as the entries in
* `FetchRequest.fetchData`.
*
* @param responseData fetched data grouped by topic-partition
@@ -187,10 +195,13 @@ public class FetchResponse extends AbstractResponse {
int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
- long lastStableOffset = INVALID_LSO;
+ long lastStableOffset = INVALID_LAST_STABLE_OFFSET;
if (partitionResponseHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME))
lastStableOffset = partitionResponseHeader.getLong(LAST_STABLE_OFFSET_KEY_NAME);
-
+ long logStartOffset = INVALID_LOG_START_OFFSET;
+ if (partitionResponseHeader.hasField(LOG_START_OFFSET_KEY_NAME))
+ logStartOffset = partitionResponseHeader.getLong(LOG_START_OFFSET_KEY_NAME);
+
Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
List<AbortedTransaction> abortedTransactions = null;
@@ -207,7 +218,7 @@ public class FetchResponse extends AbstractResponse {
}
}
- PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset,
+ PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset, logStartOffset,
abortedTransactions, records);
responseData.put(new TopicPartition(topic, partition), partitionData);
}
@@ -319,7 +330,7 @@ public class FetchResponse extends AbstractResponse {
partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
- if (version >= 4) {
+ if (partitionDataHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) {
partitionDataHeader.set(LAST_STABLE_OFFSET_KEY_NAME, fetchPartitionData.lastStableOffset);
if (fetchPartitionData.abortedTransactions == null) {
@@ -335,6 +346,8 @@ public class FetchResponse extends AbstractResponse {
partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, abortedTransactionStructs.toArray());
}
}
+ if (partitionDataHeader.hasField(LOG_START_OFFSET_KEY_NAME))
+ partitionDataHeader.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset);
partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
@@ -345,7 +358,7 @@ public class FetchResponse extends AbstractResponse {
}
struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
- if (version >= 1)
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME))
struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
return struct;
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7f20472..3dd3983 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1467,7 +1467,7 @@ public class KafkaConsumerTest {
builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
records = builder.build();
}
- tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LSO,
+ tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
null, records));
}
return new FetchResponse(tpResponses, 0);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 92150a6..224f83c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -222,7 +222,7 @@ public class FetcherTest {
public boolean matches(AbstractRequest body) {
FetchRequest fetch = (FetchRequest) body;
return fetch.fetchData().containsKey(tp) &&
- fetch.fetchData().get(tp).offset == offset;
+ fetch.fetchData().get(tp).fetchOffset == offset;
}
};
}
@@ -966,7 +966,7 @@ public class FetcherTest {
private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
- new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LSO, null, records));
+ new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8a6d69a..ad7260e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -348,7 +348,7 @@ public class RequestResponseTest {
MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000,
- FetchResponse.INVALID_LSO, null, records));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
FetchResponse v0Response = new FetchResponse(responseData, 0);
FetchResponse v1Response = new FetchResponse(responseData, 10);
@@ -373,11 +373,11 @@ public class RequestResponseTest {
new FetchResponse.AbortedTransaction(15, 50)
);
responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(Errors.NONE, 100000,
- FetchResponse.INVALID_LSO, abortedTransactions, records));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, abortedTransactions, records));
responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData(Errors.NONE, 900000,
- 5, null, records));
+ 5, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(Errors.NONE, 70000,
- 6, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
+ 6, FetchResponse.INVALID_LOG_START_OFFSET, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
FetchResponse response = new FetchResponse(responseData, 10);
FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -458,8 +458,8 @@ public class RequestResponseTest {
private FetchRequest createFetchRequest(int version) {
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
- fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
- fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
+ fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L, 1000000));
+ fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L, 1000000));
return FetchRequest.Builder.forConsumer(100, 100000, fetchData).setMaxBytes(1000).build((short) version);
}
@@ -467,12 +467,12 @@ public class RequestResponseTest {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE,
- 1000000, FetchResponse.INVALID_LSO, null, records));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
new FetchResponse.AbortedTransaction(234L, 999L));
responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData(Errors.NONE,
- 1000000, FetchResponse.INVALID_LSO, abortedTransactions, MemoryRecords.EMPTY));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, abortedTransactions, MemoryRecords.EMPTY));
return new FetchResponse(responseData, 25);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 45ba58b..8a9660b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -12,16 +12,18 @@
*/
package kafka.admin
+import java.io.IOException
import java.nio.ByteBuffer
import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
-
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
+import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.KafkaException
import kafka.coordinator.GroupOverview
import kafka.utils.Logging
import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
+import org.apache.kafka.clients.consumer.internals.{RequestFutureAdapter, ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.TimeoutException
@@ -32,7 +34,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._
@@ -40,18 +42,44 @@ import scala.util.Try
class AdminClient(val time: Time,
val requestTimeoutMs: Int,
- val retryBackoffMs: Int,
+ val retryBackoffMs: Long,
val client: ConsumerNetworkClient,
val bootstrapBrokers: List[Node]) extends Logging {
+ @volatile var running: Boolean = true
+ val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
+
+ val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
+ override def run() {
+ try {
+ while (running) {
+ client.poll(Long.MaxValue)
+ }
+ } catch {
+ case t : Throwable =>
+ error("admin-client-network-thread exited", t)
+ } finally {
+ pendingFutures.asScala.foreach { future =>
+ try {
+ future.raise(Errors.UNKNOWN)
+ } catch {
+ case _: IllegalStateException => // It is OK if the future has been completed
+ }
+ }
+ pendingFutures.clear()
+ }
+ }
+ }, true)
+
+ networkThread.start()
+
private def send(target: Node,
api: ApiKeys,
request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
- var future: RequestFuture[ClientResponse] = null
-
- future = client.send(target, request)
- client.poll(future)
-
+ val future: RequestFuture[ClientResponse] = client.send(target, request)
+ pendingFutures.add(future)
+ future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
+ pendingFutures.remove(future)
if (future.succeeded())
future.value().responseBody()
else
@@ -163,6 +191,73 @@ class AdminClient(val time: Time,
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
}.toMap
+ /*
+ * Remove all the messages whose offset is smaller than the given offset of the corresponding partition
+ *
+ * DeleteRecordsResult contains either lowWatermark of the partition or exception. We list the possible exception
+ * and their interpretations below:
+ *
+ * - DisconnectException if leader node of the partition is not available. Need retry by user.
+ * - PolicyViolationException if the topic is configured as non-deletable.
+ * - TopicAuthorizationException if the topic doesn't exist and the user doesn't have the authority to create the topic
+ * - TimeoutException if response is not available within the timeout specified by either Future's timeout or AdminClient's request timeout
+ * - UnknownTopicOrPartitionException if the partition doesn't exist or if the user doesn't have the authority to describe the topic
+ * - NotLeaderForPartitionException if broker is not leader of the partition. Need retry by user.
+ * - OffsetOutOfRangeException if the offset is larger than high watermark of this partition
+ *
+ */
+
+ def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
+ val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic()).toSet.toList.asJava)
+ val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
+ val errors = response.errors
+ if (!errors.isEmpty)
+ error(s"Metadata request contained errors: $errors")
+
+ val (partitionsWithoutError, partitionsWithError) = offsets.partition{ partitionAndOffset =>
+ !response.errors().containsKey(partitionAndOffset._1.topic())}
+
+ val (partitionsWithLeader, partitionsWithoutLeader) = partitionsWithoutError.partition{ partitionAndOffset =>
+ response.cluster().leaderFor(partitionAndOffset._1) != null}
+
+ val partitionsWithErrorResults = partitionsWithError.keys.map( partition =>
+ partition -> DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, response.errors().get(partition.topic()).exception())).toMap
+
+ val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ =>
+ DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.LEADER_NOT_AVAILABLE.exception()))
+
+ val partitionsGroupByLeader = partitionsWithLeader.groupBy(partitionAndOffset =>
+ response.cluster().leaderFor(partitionAndOffset._1))
+
+ // prepare requests and generate Future objects
+ val futures = partitionsGroupByLeader.map{ case (node, partitionAndOffsets) =>
+ val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava
+ val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
+ pendingFutures.add(future)
+ future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() {
+ override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
+ val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse]
+ val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
+ future.complete(result)
+ pendingFutures.remove(future)
+ }
+
+ override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
+ val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
+ future.complete(result)
+ pendingFutures.remove(future)
+ }
+
+ })
+ }
+
+ // default output if not receiving DeleteRecordsResponse before timeout
+ val defaultResults = offsets.mapValues(_ =>
+ DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ partitionsWithoutLeaderResults
+
+ new CompositeFuture(time, defaultResults, futures.toList)
+ }
+
/**
* Case class used to represent a consumer of a consumer group
*/
@@ -221,11 +316,53 @@ class AdminClient(val time: Time,
}
def close() {
- client.close()
+ running = false
+ try {
+ client.close()
+ } catch {
+ case e: IOException =>
+ error("Exception closing nioSelector:", e)
+ }
}
}
+/*
+ * CompositeFuture assumes that the future object in the futures list does not raise error
+ */
+class CompositeFuture[T](time: Time,
+ defaultResults: Map[TopicPartition, T],
+ futures: List[RequestFuture[Map[TopicPartition, T]]]) extends Future[Map[TopicPartition, T]] {
+
+ override def isCancelled = false
+
+ override def cancel(interrupt: Boolean) = false
+
+ override def get(): Map[TopicPartition, T] = {
+ get(Long.MaxValue, TimeUnit.MILLISECONDS)
+ }
+
+ override def get(timeout: Long, unit: TimeUnit): Map[TopicPartition, T] = {
+ val start: Long = time.milliseconds()
+ val timeoutMs = unit.toMillis(timeout)
+ var remaining: Long = timeoutMs
+
+ val observedResults = futures.flatMap{ future =>
+ val elapsed = time.milliseconds() - start
+ remaining = if (timeoutMs - elapsed > 0) timeoutMs - elapsed else 0L
+
+ if (future.awaitDone(remaining, TimeUnit.MILLISECONDS)) future.value()
+ else Map.empty[TopicPartition, T]
+ }.toMap
+
+ defaultResults ++ observedResults
+ }
+
+ override def isDone: Boolean = {
+ futures.forall(_.isDone)
+ }
+}
+
object AdminClient {
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
val DefaultRequestTimeoutMs = 5000
@@ -249,11 +386,25 @@ object AdminClient {
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .define(
+ CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+ ConfigDef.Type.INT,
+ DefaultRequestTimeoutMs,
+ ConfigDef.Importance.MEDIUM,
+ CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+ .define(
+ CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+ ConfigDef.Type.LONG,
+ DefaultRetryBackoffMs,
+ ConfigDef.Importance.MEDIUM,
+ CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.withClientSslSupport()
.withClientSaslSupport()
config
}
+ case class DeleteRecordsResult(lowWatermark: Long, error: Exception)
+
class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
def createSimplePlaintext(brokerUrl: String): AdminClient = {
@@ -270,6 +421,8 @@ object AdminClient {
val metrics = new Metrics(time)
val metadata = new Metadata
val channelBuilder = ClientUtils.createChannelBuilder(config)
+ val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
+ val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
@@ -291,7 +444,7 @@ object AdminClient {
DefaultReconnectBackoffMs,
DefaultSendBufferBytes,
DefaultReceiveBufferBytes,
- DefaultRequestTimeoutMs,
+ requestTimeoutMs,
time,
true,
new ApiVersions)
@@ -300,13 +453,13 @@ object AdminClient {
networkClient,
metadata,
time,
- DefaultRetryBackoffMs,
- DefaultRequestTimeoutMs)
+ retryBackoffMs,
+ requestTimeoutMs.toLong)
new AdminClient(
time,
- DefaultRequestTimeoutMs,
- DefaultRetryBackoffMs,
+ requestTimeoutMs,
+ retryBackoffMs,
highLevelClient,
bootstrapCluster.nodes.asScala.toList)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index ac94a7e..b87c856 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -48,6 +48,7 @@ object BrokerApiVersionsCommand {
case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
}
}
+ adminClient.close()
}
private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
new file mode 100644
index 0000000..b85a6ff
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.io.PrintStream
+import java.util.Properties
+
+import kafka.admin.AdminClient.DeleteRecordsResult
+import kafka.common.AdminCommandFailedException
+import kafka.utils.{CoreUtils, Json, CommandLineUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.CommonClientConfigs
+import joptsimple._
+
+import scala.util.{Failure, Success}
+
+/**
+ * A command for delete records of the given partitions down to the specified offset.
+ */
+object DeleteRecordsCommand {
+
+ def main(args: Array[String]): Unit = {
+ execute(args, System.out)
+ }
+
+ def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] = {
+ Json.parseFull(jsonData) match {
+ case Some(m) =>
+ m.asInstanceOf[Map[String, Any]].get("partitions") match {
+ case Some(partitionsSeq) =>
+ partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
+ val topic = p.get("topic").get.asInstanceOf[String]
+ val partition = p.get("partition").get.asInstanceOf[Int]
+ val offset = p.get("offset").get.asInstanceOf[Int].toLong
+ new TopicPartition(topic, partition) -> offset
+ })
+ case None =>
+ Seq.empty
+ }
+ case None =>
+ Seq.empty
+ }
+ }
+
+ def execute(args: Array[String], out: PrintStream): Unit = {
+ val opts = new DeleteRecordsCommandOptions(args)
+ val adminClient = createAdminClient(opts)
+ val offsetJsonFile = opts.options.valueOf(opts.offsetJsonFileOpt)
+ val offsetJsonString = Utils.readFileAsString(offsetJsonFile)
+ val offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString)
+
+ val duplicatePartitions = CoreUtils.duplicates(offsetSeq.map { case (tp, _) => tp })
+ if (duplicatePartitions.nonEmpty)
+ throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
+
+ out.println("Executing records delete operation")
+ val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = adminClient.deleteRecordsBefore(offsetSeq.toMap).get()
+ out.println("Records delete operation completed:")
+
+ deleteRecordsResult.foreach{ case (tp, partitionResult) => {
+ if (partitionResult.error == null)
+ out.println(s"partition: $tp\tlow_watermark: ${partitionResult.lowWatermark}")
+ else
+ out.println(s"partition: $tp\terror: ${partitionResult.error.toString}")
+ }}
+ adminClient.close()
+ }
+
+ private def createAdminClient(opts: DeleteRecordsCommandOptions): AdminClient = {
+ val props = if (opts.options.has(opts.commandConfigOpt))
+ Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ else
+ new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+ AdminClient.create(props)
+ }
+
+ class DeleteRecordsCommandOptions(args: Array[String]) {
+ val BootstrapServerDoc = "REQUIRED: The server to connect to."
+ val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per partition. The format to use is:\n" +
+ "{\"partitions\":\n [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}"
+ val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
+
+ val parser = new OptionParser
+ val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+ .withRequiredArg
+ .describedAs("server(s) to use for bootstrapping")
+ .ofType(classOf[String])
+ val offsetJsonFileOpt = parser.accepts("offset-json-file", offsetJsonFileDoc)
+ .withRequiredArg
+ .describedAs("Offset json file path")
+ .ofType(classOf[String])
+ val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+ .withRequiredArg
+ .describedAs("command config property file path")
+ .ofType(classOf[String])
+
+ val options = parser.parse(args : _*)
+ CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 2ed6452..6101d2a 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -65,7 +65,9 @@ object ApiVersion {
"0.10.2" -> KAFKA_0_10_2_IV0,
// KIP-98 (idempotent and transactional producer support)
"0.11.0-IV0" -> KAFKA_0_11_0_IV0,
- "0.11.0" -> KAFKA_0_11_0_IV0
+ // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+ "0.11.0-IV1" -> KAFKA_0_11_0_IV1,
+ "0.11.0" -> KAFKA_0_11_0_IV1
)
private val versionPattern = "\\.".r
@@ -155,3 +157,9 @@ case object KAFKA_0_11_0_IV0 extends ApiVersion {
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
val id: Int = 10
}
+
+case object KAFKA_0_11_0_IV1 extends ApiVersion {
+ val version: String = "0.11.0-IV0"
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val id: Int = 10
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index f91a3c3..39da605 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -204,7 +204,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
responseData.put(new TopicPartition(topic, partition),
new JFetchResponse.PartitionData(Errors.forException(e), JFetchResponse.INVALID_HIGHWATERMARK,
- JFetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+ JFetchResponse.INVALID_LAST_STABLE_OFFSET, JFetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
val errorResponse = new JFetchResponse(responseData, 0)
// Magic value does not matter here because the message set is empty
[2/3] kafka git commit: KAFKA-4586;
Add purgeDataBefore() API (KIP-107)
Posted by jq...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c2d34d9..ddb2411 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,7 +28,7 @@ import kafka.controller.KafkaController
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
+import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException}
import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
@@ -235,10 +235,20 @@ class Partition(val topic: String,
def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) {
getReplica(replicaId) match {
case Some(replica) =>
+ // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
+ val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
replica.updateLogReadResult(logReadResult)
+ val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+ // check if the LW of the partition has incremented
+ // since the replica's logStartOffset may have incremented
+ val leaderLWIncremented = newLeaderLW > oldLeaderLW
// check if we need to expand ISR to include this replica
// if it is not in the ISR yet
- maybeExpandIsr(replicaId, logReadResult)
+ val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)
+
+ // some delayed operations may be unblocked after HW or LW changed
+ if (leaderLWIncremented || leaderHWIncremented)
+ tryCompleteDelayedRequests()
debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
.format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
@@ -263,8 +273,8 @@ class Partition(val topic: String,
*
* This function can be triggered when a replica's LEO has incremented
*/
- def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
- val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
+ def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
+ inWriteLock(leaderIsrUpdateLock) {
// check if this replica needs to be added to the ISR
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
@@ -280,18 +290,12 @@ class Partition(val topic: String,
updateIsr(newInSyncReplicas)
replicaManager.isrExpandRate.mark()
}
-
// check if the HW of the partition can now be incremented
// since the replica may already be in the ISR and its LEO has just incremented
maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)
-
case None => false // nothing to do if no longer leader
}
}
-
- // some delayed operations may be unblocked after HW changed
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
}
/*
@@ -376,12 +380,25 @@ class Partition(val topic: String,
}
/**
+ * The low watermark offset value, calculated only if the local replica is the partition leader
+ * It is only used by leader broker to decide when DeleteRecordsRequest is satisfied. Its value is minimum logStartOffset of all live replicas
+ * Low watermark will increase when the leader broker receives either FetchRequest or DeleteRecordsRequest.
+ */
+ def lowWatermarkIfLeader: Long = {
+ if (!isLeaderReplicaLocal)
+ throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))
+ assignedReplicas.filter(replica =>
+ replicaManager.metadataCache.isBrokerAlive(replica.brokerId)).map(_.logStartOffset).reduceOption(_ min _).getOrElse(0L)
+ }
+
+ /**
* Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
*/
private def tryCompleteDelayedRequests() {
val requestKey = new TopicPartitionOperationKey(topicPartition)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
+ replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
@@ -467,6 +484,27 @@ class Partition(val topic: String,
info
}
+ /**
+ * Update logStartOffset and low watermark if 1) offset <= highWatermark and 2) it is the leader replica.
+ * This function can trigger log segment deletion and log rolling.
+ *
+ * Return low watermark of the partition.
+ */
+ def deleteRecordsOnLeader(offset: Long): Long = {
+ inReadLock(leaderIsrUpdateLock) {
+ leaderReplicaIfLocal match {
+ case Some(leaderReplica) =>
+ leaderReplica.maybeIncrementLogStartOffset(offset)
+ if (!leaderReplica.log.get.config.delete)
+ throw new PolicyViolationException("Records of partition %s can not be deleted due to the configured policy".format(topicPartition))
+ lowWatermarkIfLeader
+ case None =>
+ throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+ .format(topicPartition, localBrokerId))
+ }
+ }
+ }
+
private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 8597b06..3995f9e 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -21,7 +21,8 @@ import kafka.log.Log
import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
-import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+
import org.apache.kafka.common.utils.Time
@@ -35,6 +36,9 @@ class Replica(val brokerId: Int,
// the log end offset value, kept in all replicas;
// for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
@volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+ // the log start offset value, kept in all replicas;
+ // for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
+ @volatile private[this] var _logStartOffset = Log.UnknownLogStartOffset
// The log end offset value at the time the leader received the last FetchRequest from this follower
// This is used to determine the lastCaughtUpTimeMs of the follower
@@ -72,6 +76,7 @@ class Replica(val brokerId: Int,
else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
+ logStartOffset = logReadResult.followerLogStartOffset
logEndOffset = logReadResult.info.fetchOffsetMetadata
lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
lastFetchTimeMs = logReadResult.fetchTimeMs
@@ -98,6 +103,33 @@ class Replica(val brokerId: Int,
else
logEndOffsetMetadata
+ def maybeIncrementLogStartOffset(offset: Long) {
+ if (isLocal) {
+ if (highWatermark.messageOffset < offset)
+ throw new OffsetOutOfRangeException(s"The specified offset $offset is higher than the high watermark" +
+ s" ${highWatermark.messageOffset} of the partition $topicPartition")
+ log.get.maybeIncrementLogStartOffset(offset)
+ } else {
+ throw new KafkaException(s"Should not try to delete records on partition $topicPartition's non-local replica $brokerId")
+ }
+ }
+
+ private def logStartOffset_=(newLogStartOffset: Long) {
+ if (isLocal) {
+ throw new KafkaException(s"Should not set log start offset on partition $topicPartition's local replica $brokerId " +
+ s"without attempting to delete records of the log")
+ } else {
+ _logStartOffset = newLogStartOffset
+ trace(s"Setting log start offset for remote replica $brokerId for partition $topicPartition to [$newLogStartOffset]")
+ }
+ }
+
+ def logStartOffset =
+ if (isLocal)
+ log.get.logStartOffset
+ else
+ _logStartOffset
+
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
if (isLocal) {
highWatermarkMetadata = newHighWatermark
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 75b1f24..c4b7ce6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -101,8 +101,7 @@ class ConsumerFetcherThread(name: String,
protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
if (partitionFetchState.isActive)
- fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.offset,
- fetchSize)
+ fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.fetchOffset, fetchSize)
}
new FetchRequest(fetchRequestBuilder.build())
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d2cac23..96535b1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -79,6 +79,17 @@ case class LogAppendInfo(var firstOffset: Long,
*
* @param dir The directory in which log segments are created.
* @param config The log configuration settings
+ * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
+ * The logStartOffset can be updated by :
+ * - user's DeleteRecordsRequest
+ * - broker's log retention
+ * - broker's log truncation
+ * The logStartOffset is used to decide the following:
+ * - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
+ * It may trigger log rolling if the active segment is deleted.
+ * - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
+ * we make sure that logStartOffset <= log's highWatermark
+ * Other activities such as log cleaning are not affected by logStartOffset.
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
@@ -87,6 +98,7 @@ case class LogAppendInfo(var firstOffset: Long,
@threadsafe
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
+ @volatile var logStartOffset: Long = 0L,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
@@ -118,8 +130,10 @@ class Log(@volatile var dir: File,
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
activeSegment.size.toInt)
- info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
- .format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
+ logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+
+ info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
+ .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
}
val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
@@ -443,6 +457,20 @@ class Log(@volatile var dir: File,
}
}
+ /*
+ * Increment the log start offset if the provided offset is larger.
+ */
+ def maybeIncrementLogStartOffset(offset: Long) {
+ // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
+ // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
+ // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
+ lock synchronized {
+ if (offset > logStartOffset) {
+ logStartOffset = offset
+ }
+ }
+ }
+
/**
* Validate the following:
* <ol>
@@ -543,7 +571,7 @@ class Log(@volatile var dir: File,
* @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
*
- * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
+ * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
* @return The fetch data information including fetch starting offset metadata and messages read.
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
@@ -558,9 +586,9 @@ class Log(@volatile var dir: File,
var entry = segments.floorEntry(startOffset)
- // attempt to read beyond the log end offset is an error
- if(startOffset > next || entry == null)
- throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
+ // return error on attempt to read beyond the log end offset or read below log start offset
+ if(startOffset > next || entry == null || startOffset < logStartOffset)
+ throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
@@ -626,7 +654,7 @@ class Log(@volatile var dir: File,
val segmentsCopy = logSegments.toBuffer
// For the earliest and latest, we do not need to return the timestamp.
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
- return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
+ return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
@@ -640,7 +668,7 @@ class Log(@volatile var dir: File,
None
}
- targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
+ targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
}
/**
@@ -666,16 +694,21 @@ class Log(@volatile var dir: File,
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
- val numToDelete = deletable.size
- if (numToDelete > 0) {
- // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
- if (segments.size == numToDelete)
- roll()
- // remove the segments for lookups
- deletable.foreach(deleteSegment)
- }
- numToDelete
+ deleteSegments(deletable)
+ }
+ }
+
+ private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
+ val numToDelete = deletable.size
+ if (numToDelete > 0) {
+ // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
+ if (segments.size == numToDelete)
+ roll()
+ // remove the segments for lookups
+ deletable.foreach(deleteSegment)
+ logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
}
+ numToDelete
}
/**
@@ -696,10 +729,10 @@ class Log(@volatile var dir: File,
*/
def deleteOldSegments(): Int = {
if (!config.delete) return 0
- deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
+ deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
}
- private def deleteRetenionMsBreachedSegments() : Int = {
+ private def deleteRetentionMsBreachedSegments() : Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
@@ -719,16 +752,27 @@ class Log(@volatile var dir: File,
deleteOldSegments(shouldDelete)
}
+ private def deleteLogStartOffsetBreachedSegments() : Int = {
+ // keep active segment to avoid frequent log rolling due to user's DeleteRecordsRequest
+ lock synchronized {
+ val deletable = {
+ if (segments.size() < 2)
+ Seq.empty
+ else
+ logSegments.sliding(2).takeWhile { iterable =>
+ val nextSegment = iterable.toSeq(1)
+ nextSegment.baseOffset <= logStartOffset
+ }.map(_.toSeq(0)).toSeq
+ }
+ deleteSegments(deletable)
+ }
+ }
+
/**
* The size of the log in bytes
*/
def size: Long = logSegments.map(_.size).sum
- /**
- * The earliest message offset in the log
- */
- def logStartOffset: Long = logSegments.head.baseOffset
-
/**
* The offset metadata of the next message that will be appended to the log
*/
@@ -789,7 +833,7 @@ class Log(@volatile var dir: File,
def roll(expectedNextOffset: Long = 0): LogSegment = {
val start = time.nanoseconds
lock synchronized {
- val newOffset = Math.max(expectedNextOffset, logEndOffset)
+ val newOffset = math.max(expectedNextOffset, logEndOffset)
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
val timeIndexFile = timeIndexFilename(dir, newOffset)
@@ -895,6 +939,7 @@ class Log(@volatile var dir: File,
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
+ this.logStartOffset = math.min(targetOffset, this.logStartOffset)
}
}
}
@@ -920,6 +965,7 @@ class Log(@volatile var dir: File,
preallocate = config.preallocate))
updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
+ this.logStartOffset = newOffset
}
}
@@ -1082,6 +1128,8 @@ object Log {
/** a directory that is scheduled to be deleted */
val DeleteDirSuffix = "-delete"
+ val UnknownLogStartOffset = -1L
+
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 761edf9..a555420 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -48,12 +48,14 @@ class LogManager(val logDirs: Array[File],
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
- val flushCheckpointMs: Long,
+ val flushRecoveryOffsetCheckpointMs: Long,
+ val flushStartOffsetCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
+ val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
@@ -64,6 +66,7 @@ class LogManager(val logDirs: Array[File],
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
+ private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, LogStartOffsetCheckpointFile)))).toMap
loadLogs()
// public, so we can access this from kafka.admin.DeleteTopicTest
@@ -139,10 +142,18 @@ class LogManager(val logDirs: Array[File],
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception =>
- warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
+ warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}
+ var logStartOffsets = Map[TopicPartition, Long]()
+ try {
+ logStartOffsets = this.logStartOffsetCheckpoints(dir).read
+ } catch {
+ case e: Exception =>
+ warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e)
+ }
+
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
@@ -153,8 +164,9 @@ class LogManager(val logDirs: Array[File],
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
+ val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
- val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
+ val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
this.logsToBeDeleted.add(current)
} else {
@@ -210,7 +222,12 @@ class LogManager(val logDirs: Array[File],
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
- period = flushCheckpointMs,
+ period = flushRecoveryOffsetCheckpointMs,
+ TimeUnit.MILLISECONDS)
+ scheduler.schedule("kafka-log-start-offset-checkpoint",
+ checkpointLogStartOffsets,
+ delay = InitialTaskDelayMs,
+ period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs",
deleteLogs,
@@ -263,7 +280,10 @@ class LogManager(val logDirs: Array[File],
// update the last flush point
debug("Updating recovery points at " + dir)
- checkpointLogsInDir(dir)
+ checkpointLogRecoveryOffsetsInDir(dir)
+
+ debug("Updating log start offsets at " + dir)
+ checkpointLogStartOffsetsInDir(dir)
// mark that the shutdown was clean by creating marker file
debug("Writing clean shutdown marker at " + dir)
@@ -333,13 +353,21 @@ class LogManager(val logDirs: Array[File],
* to avoid recovering the whole log on startup.
*/
def checkpointRecoveryPointOffsets() {
- this.logDirs.foreach(checkpointLogsInDir)
+ this.logDirs.foreach(checkpointLogRecoveryOffsetsInDir)
+ }
+
+ /**
+ * Write out the current log start offset for all logs to a text file in the log directory
+ * to avoid exposing data that have been deleted by DeleteRecordsRequest
+ */
+ def checkpointLogStartOffsets() {
+ this.logDirs.foreach(checkpointLogStartOffsetsInDir)
}
/**
* Make a checkpoint for all logs in provided directory.
*/
- private def checkpointLogsInDir(dir: File): Unit = {
+ private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
@@ -347,6 +375,17 @@ class LogManager(val logDirs: Array[File],
}
/**
+ * Checkpoint log start offset for all logs in provided directory.
+ */
+ private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
+ val logs = this.logsByDir.get(dir.toString)
+ if (logs.isDefined) {
+ this.logStartOffsetCheckpoints(dir).write(
+ logs.get.filter{case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset}.mapValues(_.logStartOffset))
+ }
+ }
+
+ /**
* Get the log if it exists, otherwise return None
*/
def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
@@ -362,7 +401,7 @@ class LogManager(val logDirs: Array[File],
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
- val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
+ val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
@@ -425,6 +464,7 @@ class LogManager(val logDirs: Array[File],
val renamedDir = new File(removedLog.dir.getParent, dirName)
val renameSuccessful = removedLog.dir.renameTo(renamedDir)
if (renameSuccessful) {
+ checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
removedLog.dir = renamedDir
// change the file pointers for log and index file
for (logSegment <- removedLog.logSegments) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9263515..4e77625 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -387,32 +387,33 @@ class LogSegment(val log: FileRecords,
}
/**
- * Search the message offset based on timestamp.
- * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
- * greater than or equals to the target timestamp.
+ * Search the message offset based on timestamp and offset.
*
- * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
- * timestamp will be max timestamp in the segment.
+ * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
*
- * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
- * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
+ * - If all the messages in the segment have smaller offsets, return None
+ * - If all the messages in the segment have smaller timestamps, return None
+ * - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp
+ * the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp.
+ * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
+ * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
*
- * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
- * from the indexed position. This could happen if the log is truncated after we get the indexed position but
- * before we scan the log from there. In this case we simply return None and the caller will need to check on
- * the truncated log and maybe retry or even do the search on another log segment.
+ * This methods only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not
+ * see any message when scanning the log from the indexed position. The latter could happen if the log is truncated
+ * after we get the indexed position but before we scan the log from there. In this case we simply return None and the
+ * caller will need to check on the truncated log and maybe retry or even do the search on another log segment.
*
* @param timestamp The timestamp to search for.
- * @return the timestamp and offset of the first message whose timestamp is larger than or equal to the
- * target timestamp. None will be returned if there is no such message.
+ * @param startingOffset The starting offset to search.
+ * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message.
*/
- def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
+ def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
// Get the index entry with a timestamp less than or equal to the target timestamp
val timestampOffset = timeIndex.lookup(timestamp)
- val position = index.lookup(timestampOffset.offset).position
+ val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position
// Search the timestamp
- Option(log.searchForTimestamp(timestamp, position)).map { timestampAndOffset =>
+ Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8842724..14e56bd 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -63,13 +63,13 @@ abstract class AbstractFetcherThread(name: String,
/* callbacks to be defined in subclass */
// process fetched data
- def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
+ protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
// handle a partition whose offset is out of range and return a new fetch offset
- def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
+ protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
// deal with partitions with errors, potentially due to leadership changes
- def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
+ protected def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ
@@ -140,17 +140,17 @@ abstract class AbstractFetcherThread(name: String,
val partitionId = topicPartition.partition
Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
// we append to the log if the current offset is defined and it is the same as the offset requested during fetch
- if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {
+ if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset) {
partitionData.error match {
case Errors.NONE =>
try {
val records = partitionData.toRecords
val newOffset = records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
- currentPartitionFetchState.offset)
+ currentPartitionFetchState.fetchOffset)
fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
- processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
+ processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData)
val validBytes = records.validBytes
if (validBytes > 0) {
@@ -164,18 +164,18 @@ abstract class AbstractFetcherThread(name: String,
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
- logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage)
+ logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset + " error " + ime.getMessage)
updatePartitionsWithError(topicPartition);
case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
- .format(topic, partitionId, currentPartitionFetchState.offset), e)
+ .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e)
}
case Errors.OFFSET_OUT_OF_RANGE =>
try {
val newOffset = handleOffsetOutOfRange(topicPartition)
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
- .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
+ .format(currentPartitionFetchState.fetchOffset, topic, partitionId, newOffset))
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
@@ -226,7 +226,7 @@ abstract class AbstractFetcherThread(name: String,
for (partition <- partitions) {
Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
if (currentPartitionFetchState.isActive)
- partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
+ partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay)))
)
}
partitionMapCond.signalAll()
@@ -350,11 +350,11 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId:
/**
* case class to keep partition offset and its state(active, inactive)
*/
-case class PartitionFetchState(offset: Long, delay: DelayedItem) {
+case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem) {
- def this(offset: Long) = this(offset, new DelayedItem(0))
+ def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0
- override def toString = "%d-%b".format(offset, isActive)
+ override def toString = "%d-%b".format(fetchOffset, isActive)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
new file mode 100644
index 0000000..e5b301c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Meter
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.DeleteRecordsResponse
+
+import scala.collection._
+
+
+case class DeleteRecordsPartitionStatus(requiredOffset: Long,
+ responseStatus: DeleteRecordsResponse.PartitionResponse) {
+ @volatile var acksPending = false
+
+ override def toString = "[acksPending: %b, error: %s, lowWatermark: %d, requiredOffset: %d]"
+ .format(acksPending, responseStatus.error.toString, responseStatus.lowWatermark, requiredOffset)
+}
+
+/**
+ * A delayed delete records operation that can be created by the replica manager and watched
+ * in the delete records operation purgatory
+ */
+class DelayedDeleteRecords(delayMs: Long,
+ deleteRecordsStatus: Map[TopicPartition, DeleteRecordsPartitionStatus],
+ replicaManager: ReplicaManager,
+ responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit)
+ extends DelayedOperation(delayMs) {
+
+ // first update the acks pending variable according to the error code
+ deleteRecordsStatus.foreach { case (topicPartition, status) =>
+ if (status.responseStatus.error == Errors.NONE) {
+ // Timeout error state will be cleared when required acks are received
+ status.acksPending = true
+ status.responseStatus.error = Errors.REQUEST_TIMED_OUT
+ } else {
+ status.acksPending = false
+ }
+
+ trace("Initial partition status for %s is %s".format(topicPartition, status))
+ }
+
+ /**
+ * The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
+ *
+ * 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response
+ * 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
+ *
+ */
+ override def tryComplete(): Boolean = {
+ // check for each partition if it still has pending acks
+ deleteRecordsStatus.foreach { case (topicPartition, status) =>
+ trace(s"Checking delete records satisfaction for ${topicPartition}, current status $status")
+ // skip those partitions that have already been satisfied
+ if (status.acksPending) {
+ val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
+ case Some(partition) =>
+ partition.leaderReplicaIfLocal match {
+ case Some(_) =>
+ val leaderLW = partition.lowWatermarkIfLeader
+ (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
+ case None =>
+ (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+ }
+ case None =>
+ (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+ }
+ if (error != Errors.NONE || lowWatermarkReached) {
+ status.acksPending = false
+ status.responseStatus.error = error
+ status.responseStatus.lowWatermark = lw
+ }
+ }
+ }
+
+ // check if every partition has satisfied at least one of case A or B
+ if (!deleteRecordsStatus.values.exists(_.acksPending))
+ forceComplete()
+ else
+ false
+ }
+
+ override def onExpiration() {
+ deleteRecordsStatus.foreach { case (topicPartition, status) =>
+ if (status.acksPending) {
+ DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
+ }
+ }
+ }
+
+ /**
+ * Upon completion, return the current response status along with the error code per partition
+ */
+ override def onComplete() {
+ val responseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+ responseCallback(responseStatus)
+ }
+}
+
+object DelayedDeleteRecordsMetrics extends KafkaMetricsGroup {
+
+ private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
+
+ def recordExpiration(partition: TopicPartition) {
+ aggregateExpirationMeter.mark()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index a05131a..cbee78a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -150,7 +150,7 @@ class DelayedFetch(delayMs: Long,
)
val fetchPartitionData = logReadResults.map { case (tp, result) =>
- tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+ tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
}
responseCallback(fetchPartitionData)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0798efd..defbf34 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -99,6 +99,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
+ case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -147,7 +148,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val leaderAndIsrResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
+ val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
} else {
val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
@@ -199,7 +200,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val updateMetadataResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
+ val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
if (deletedPartitions.nonEmpty)
coordinator.handleDeletedPartitions(deletedPartitions)
@@ -451,12 +452,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
// the callback for sending a fetch response
@@ -474,17 +475,17 @@ class KafkaApis(val requestChannel: RequestChannel,
val convertedData = replicaManager.getMagic(tp) match {
case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
trace(s"Down converting message to V0 for fetch request from $clientId")
- FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+ FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
trace(s"Down converting message to V1 for fetch request from $clientId")
- FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+ FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
case _ => data
}
- tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LSO,
- null, convertedData.records)
+ tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ convertedData.logStartOffset, null, convertedData.records)
}
}
@@ -728,7 +729,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new Array[(Long, Long)](segments.length)
for (i <- segments.indices)
- offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified)
+ offsetTimeArray(i) = (math.max(segments(i).baseOffset, log.logStartOffset), segments(i).lastModified)
if (lastSegmentHasSize)
offsetTimeArray(segments.length) = (log.logEndOffset, time.milliseconds)
@@ -1259,6 +1260,54 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleDeleteRecordsRequest(request: RequestChannel.Request) {
+ val deleteRecordsRequest = request.body[DeleteRecordsRequest]
+
+ val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition {
+ case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
+ }
+
+ val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition {
+ case (topicPartition, _) => authorize(request.session, Delete, new Resource(auth.Topic, topicPartition.topic))
+ }
+
+ // the callback for sending a DeleteRecordsResponse
+ def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
+
+ val mergedResponseStatus = responseStatus ++
+ unauthorizedForDeleteTopics.mapValues(_ =>
+ new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+ new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+ mergedResponseStatus.foreach { case (topicPartition, status) =>
+ if (status.error != Errors.NONE) {
+ debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
+ request.header.correlationId,
+ request.header.clientId,
+ topicPartition,
+ status.error.exceptionName))
+ }
+ }
+
+ val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
+ requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+
+ // When this callback is triggered, the remote API call has completed
+ request.apiRemoteCompleteTimeMs = time.milliseconds
+ }
+
+ if (authorizedForDeleteTopics.isEmpty)
+ sendResponseCallback(Map.empty)
+ else {
+ // call the replica manager to append messages to the replicas
+ replicaManager.deleteRecords(
+ deleteRecordsRequest.timeout.toLong,
+ authorizedForDeleteTopics.mapValues(_.toLong),
+ sendResponseCallback)
+ }
+ }
+
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 879bc51..fe6631e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -100,6 +100,7 @@ object Defaults {
val LogDeleteDelayMs = 60000
val LogFlushSchedulerIntervalMs = Long.MaxValue
val LogFlushOffsetCheckpointIntervalMs = 60000
+ val LogFlushStartOffsetCheckpointIntervalMs = 60000
val LogPreAllocateEnable = false
// lazy val as `InterBrokerProtocolVersion` is defined later
lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
@@ -125,6 +126,7 @@ object Defaults {
val ReplicaHighWatermarkCheckpointIntervalMs = 5000L
val FetchPurgatoryPurgeIntervalRequests = 1000
val ProducerPurgatoryPurgeIntervalRequests = 1000
+ val DeleteRecordsPurgatoryPurgeIntervalRequests = 1
val AutoLeaderRebalanceEnable = true
val LeaderImbalancePerBrokerPercentage = 10
val LeaderImbalanceCheckIntervalSeconds = 300
@@ -273,6 +275,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
val LogFlushIntervalMsProp = "log.flush.interval.ms"
val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+ val LogFlushStartOffsetCheckpointIntervalMsProp = "log.flush.start.offset.checkpoint.interval.ms"
val LogPreAllocateProp = "log.preallocate"
val LogMessageFormatVersionProp = LogConfigPrefix + "message.format.version"
val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type"
@@ -296,6 +299,7 @@ object KafkaConfig {
val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests"
+ val DeleteRecordsPurgatoryPurgeIntervalRequestsProp = "delete.records.purgatory.purge.interval.requests"
val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
@@ -468,6 +472,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in " + LogFlushSchedulerIntervalMsProp + " is used"
val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
+ val LogFlushStartOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of log start offset"
val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
val LogMessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion. " +
"Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format version, the " +
@@ -521,6 +526,7 @@ object KafkaConfig {
val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk"
val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory"
val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory"
+ val DeleteRecordsPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the delete records request purgatory"
val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals"
val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage."
val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller"
@@ -686,6 +692,7 @@ object KafkaConfig {
.define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
.define(LogFlushIntervalMsProp, LONG, null, HIGH, LogFlushIntervalMsDoc)
.define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
+ .define(LogFlushStartOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushStartOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushStartOffsetCheckpointIntervalMsDoc)
.define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
.define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
.define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
@@ -710,6 +717,7 @@ object KafkaConfig {
.define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc)
.define(FetchPurgatoryPurgeIntervalRequestsProp, INT, Defaults.FetchPurgatoryPurgeIntervalRequests, MEDIUM, FetchPurgatoryPurgeIntervalRequestsDoc)
.define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc)
+ .define(DeleteRecordsPurgatoryPurgeIntervalRequestsProp, INT, Defaults.DeleteRecordsPurgatoryPurgeIntervalRequests, MEDIUM, DeleteRecordsPurgatoryPurgeIntervalRequestsDoc)
.define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc)
.define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc)
.define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
@@ -862,6 +870,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
+ val logFlushStartOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp).toLong
val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
val logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
@@ -907,6 +916,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
+ val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 465b0b7..0d3e49c 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -297,7 +297,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
- new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
+ new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, metadataCache)
private def initZk(): ZkUtils = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
@@ -655,7 +655,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
- flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+ flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+ flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9a6090d..bf36974 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -132,6 +132,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
+ def isBrokerAlive(brokerId: Int): Boolean = {
+ inReadLock(partitionMetadataLock) {
+ aliveBrokers.contains(brokerId)
+ }
+ }
+
def getAliveBrokers: Seq[Broker] = {
inReadLock(partitionMetadataLock) {
aliveBrokers.values.toBuffer
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 29a2467..5f055a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -60,7 +60,8 @@ class ReplicaFetcherThread(name: String,
type PD = PartitionData
private val fetchRequestVersion: Short =
- if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
+ if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
+ else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
@@ -136,10 +137,12 @@ class ReplicaFetcherThread(name: String,
trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+ val leaderLogStartOffset = partitionData.logStartOffset
// for the follower replica, we do not need to keep
// its segment base offset the physical position,
// these values will be computed upon making the leader
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+ replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
if (logger.isTraceEnabled)
trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
if (quota.isThrottled(topicPartition))
@@ -289,8 +292,10 @@ class ReplicaFetcherThread(name: String,
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
- if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
- requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
+ if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) {
+ val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
+ requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
+ }
}
val requestBuilder = JFetchRequest.Builder.forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, requestMap)
@@ -313,7 +318,7 @@ object ReplicaFetcherThread {
private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
def isEmpty: Boolean = underlying.fetchData().isEmpty
def offset(topicPartition: TopicPartition): Long =
- underlying.fetchData().asScala(topicPartition).offset
+ underlying.fetchData().asScala(topicPartition).fetchOffset
}
private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
@@ -326,6 +331,8 @@ object ReplicaFetcherThread {
def highWatermark: Long = underlying.highWatermark
+ def logStartOffset: Long = underlying.logStartOffset
+
def exception: Option[Throwable] = error match {
case Errors.NONE => None
case e => Some(e.exception)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5ba093e..8f67425 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,12 +29,13 @@ import kafka.log.{Log, LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, PolicyViolationException}
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, DeleteRecordsRequest, DeleteRecordsResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -52,6 +53,13 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
}
}
+case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exception: Option[Throwable] = None) {
+ def error: Errors = exception match {
+ case None => Errors.NONE
+ case Some(e) => Errors.forException(e)
+ }
+}
+
/*
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
@@ -63,7 +71,9 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
*/
case class LogReadResult(info: FetchDataInfo,
hw: Long,
+ leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
+ followerLogStartOffset: Long,
fetchTimeMs: Long,
readSize: Int,
exception: Option[Throwable] = None) {
@@ -74,16 +84,19 @@ case class LogReadResult(info: FetchDataInfo,
}
override def toString =
- s"Fetch Data: [$info], HW: [$hw], leaderLogEndOffset: [$leaderLogEndOffset], readSize: [$readSize], error: [$error]"
+ s"Fetch Data: [$info], HW: [$hw], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
+ s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]"
}
-case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, records: Records)
+case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records)
object LogReadResult {
val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
+ leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
+ followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = -1)
}
@@ -109,6 +122,7 @@ class ReplicaManager(val config: KafkaConfig,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
quotaManager: ReplicationQuotaManager,
+ val metadataCache: MetadataCache,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
@@ -130,6 +144,8 @@ class ReplicaManager(val config: KafkaConfig,
purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests)
+ val delayedDeleteRecordsPurgatory = DelayedOperationPurgatory[DelayedDeleteRecords](
+ purgatoryName = "DeleteRecords", localBrokerId, config.deleteRecordsPurgatoryPurgeIntervalRequests)
val leaderCount = newGauge(
"LeaderCount",
@@ -212,6 +228,15 @@ class ReplicaManager(val config: KafkaConfig,
debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
}
+ /**
+ * Try to complete some delayed DeleteRecordsRequest with the request key;
+ * this needs to be triggered when the partition low watermark has changed
+ */
+ def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) {
+ val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key)
+ debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
+ }
+
def startup() {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
@@ -316,7 +341,7 @@ class ReplicaManager(val config: KafkaConfig,
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
}
- if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
+ if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
@@ -345,14 +370,108 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ /**
+ * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
+ * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
+ */
+ private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+ trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
+ offsetPerPartition.map { case (topicPartition, requestedOffset) =>
+ // reject delete records operation on internal topics
+ if (Topic.isInternal(topicPartition.topic)) {
+ (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
+ } else {
+ try {
+ val partition = getPartition(topicPartition).getOrElse(
+ throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)))
+ val convertedOffset =
+ if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
+ partition.leaderReplicaIfLocal match {
+ case Some(leaderReplica) =>
+ leaderReplica.highWatermark.messageOffset
+ case None =>
+ throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+ .format(topicPartition, localBrokerId))
+ }
+ } else
+ requestedOffset
+ if (convertedOffset < 0)
+ throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid")
+
+ val lowWatermark = partition.deleteRecordsOnLeader(convertedOffset)
+ (topicPartition, LogDeleteRecordsResult(convertedOffset, lowWatermark))
+ } catch {
+ // NOTE: Failed produce requests metric is not incremented for known exceptions
+ // it is supposed to indicate un-expected failures of a broker in handling a produce request
+ case e: KafkaStorageException =>
+ fatal("Halting due to unrecoverable I/O error while handling DeleteRecordsRequest: ", e)
+ Runtime.getRuntime.halt(1)
+ (topicPartition, null)
+ case e@ (_: UnknownTopicOrPartitionException |
+ _: NotLeaderForPartitionException |
+ _: OffsetOutOfRangeException |
+ _: PolicyViolationException |
+ _: NotEnoughReplicasException) =>
+ (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e)))
+ case t: Throwable =>
+ error("Error processing delete records operation on partition %s".format(topicPartition), t)
+ (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(t)))
+ }
+ }
+ }
+ }
+
+ // If there exists a topic partition that meets the following requirement,
+ // we need to put a delayed DeleteRecordsRequest and wait for the delete records operation to complete
+ //
+ // 1. the delete records operation on this partition is successful
+ // 2. low watermark of this partition is smaller than the specified offset
+ private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
+ localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) =>
+ deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
+ }
+ }
+
+ def deleteRecords(timeout: Long,
+ offsetPerPartition: Map[TopicPartition, Long],
+ responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) {
+ val timeBeforeLocalDeleteRecords = time.milliseconds
+ val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
+ debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))
+
+ val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
+ topicPartition ->
+ DeleteRecordsPartitionStatus(
+ result.requestedOffset, // requested offset
+ new DeleteRecordsResponse.PartitionResponse(result.lowWatermark, result.error)) // response status
+ }
+
+ if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
+ // create delayed delete records operation
+ val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
+
+ // create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
+ val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+
+ // try to complete the request immediately, otherwise put it into the purgatory
+ // this is because while the delayed delete records operation is being created, new
+ // requests may arrive and hence make this operation completable.
+ delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
+ } else {
+ // we can respond immediately
+ val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+ responseCallback(deleteRecordsResponseStatus)
+ }
+ }
+
// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
- private def delayedRequestRequired(requiredAcks: Short,
- entriesPerPartition: Map[TopicPartition, MemoryRecords],
- localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
+ private def delayedProduceRequestRequired(requiredAcks: Short,
+ entriesPerPartition: Map[TopicPartition, MemoryRecords],
+ localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
entriesPerPartition.nonEmpty &&
localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
@@ -471,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
// 4) some error happens while reading data
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
- tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+ tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
}
responseCallback(fetchPartitionData)
} else {
@@ -508,8 +627,9 @@ class ReplicaManager(val config: KafkaConfig,
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
- val offset = fetchInfo.offset
+ val offset = fetchInfo.fetchOffset
val partitionFetchSize = fetchInfo.maxBytes
+ val followerLogStartOffset = fetchInfo.logStartOffset
BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
@@ -539,6 +659,7 @@ class ReplicaManager(val config: KafkaConfig,
*/
val initialLogEndOffset = localReplica.logEndOffset.messageOffset
val initialHighWatermark = localReplica.highWatermark.messageOffset
+ val initialLogStartOffset = localReplica.logStartOffset
val fetchTimeMs = time.milliseconds
val logReadInfo = localReplica.log match {
case Some(log) =>
@@ -563,7 +684,9 @@ class ReplicaManager(val config: KafkaConfig,
LogReadResult(info = logReadInfo,
hw = initialHighWatermark,
+ leaderLogStartOffset = initialLogStartOffset,
leaderLogEndOffset = initialLogEndOffset,
+ followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
readSize = partitionFetchSize,
exception = None)
@@ -576,7 +699,9 @@ class ReplicaManager(val config: KafkaConfig,
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
+ leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
+ followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = partitionFetchSize,
exception = Some(e))
@@ -586,7 +711,9 @@ class ReplicaManager(val config: KafkaConfig,
error(s"Error processing fetch operation on partition $tp, offset $offset", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
+ leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
+ followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = partitionFetchSize,
exception = Some(e))
@@ -622,7 +749,7 @@ class ReplicaManager(val config: KafkaConfig,
def getMagic(topicPartition: TopicPartition): Option[Byte] =
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
- def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] = {
+ def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
@@ -640,7 +767,6 @@ class ReplicaManager(val config: KafkaConfig,
}
def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
- metadataCache: MetadataCache,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
@@ -695,7 +821,7 @@ class ReplicaManager(val config: KafkaConfig,
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
- makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
+ makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
else
Set.empty[Partition]
@@ -801,8 +927,7 @@ class ReplicaManager(val config: KafkaConfig,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
- responseMap: mutable.Map[TopicPartition, Errors],
- metadataCache: MetadataCache) : Set[Partition] = {
+ responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")