You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/19 19:52:34 UTC
[kafka] branch 2.1 updated: KAFKA-8564;
Fix NPE on deleted partition dir when no segments remain (#6968)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 96d8bb4 KAFKA-8564; Fix NPE on deleted partition dir when no segments remain (#6968)
96d8bb4 is described below
commit 96d8bb4e01ae31b1d18a5d6a5f6aefce01a94dc4
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Jun 19 20:41:05 2019 +0100
KAFKA-8564; Fix NPE on deleted partition dir when no segments remain (#6968)
Kafka should not NPE while loading a deleted partition dir with no log segments. This patch ensures that there will always be at least one segment after initialization.
Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
Co-authored-by: Mickael Maison <mi...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/log/Log.scala | 9 +++++++++
core/src/test/scala/unit/kafka/log/LogTest.scala | 12 +++++++++++-
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 33cb48f..24d232f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -577,6 +577,15 @@ class Log(@volatile var dir: File,
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
+ if (logSegments.isEmpty) {
+ addSegment(LogSegment.open(dir = dir,
+ baseOffset = 0,
+ config,
+ time = time,
+ fileAlreadyExists = false,
+ initFileSize = this.initFileSize,
+ preallocate = false))
+ }
0
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index fc306ee..570e380 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3683,7 +3683,17 @@ class LogTest {
assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head)
}
- private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
+ @Test
+ def testLoadPartitionDirWithNoSegmentsShouldNotThrow() {
+ val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3))
+ val logDir = new File(tmpDir, dirName)
+ logDir.mkdirs()
+ val logConfig = LogTest.createLogConfig()
+ val log = createLog(logDir, logConfig)
+ assertEquals(1, log.numberOfSegments)
+ }
+
+ private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {
var sequence = 0