You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/19 19:52:34 UTC

[kafka] branch 2.1 updated: KAFKA-8564; Fix NPE on deleted partition dir when no segments remain (#6968)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 96d8bb4  KAFKA-8564; Fix NPE on deleted partition dir when no segments remain (#6968)
96d8bb4 is described below

commit 96d8bb4e01ae31b1d18a5d6a5f6aefce01a94dc4
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Jun 19 20:41:05 2019 +0100

    KAFKA-8564; Fix NPE on deleted partition dir when no segments remain (#6968)
    
    Kafka should not NPE while loading a deleted partition dir with no log segments. This patch ensures that there will always be at least one segment after initialization.
    
    Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
    Co-authored-by: Mickael Maison <mi...@gmail.com>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala          |  9 +++++++++
 core/src/test/scala/unit/kafka/log/LogTest.scala | 12 +++++++++++-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 33cb48f..24d232f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -577,6 +577,15 @@ class Log(@volatile var dir: File,
       activeSegment.resizeIndexes(config.maxIndexSize)
       nextOffset
     } else {
+       if (logSegments.isEmpty) {
+          addSegment(LogSegment.open(dir = dir,
+            baseOffset = 0,
+            config,
+            time = time,
+            fileAlreadyExists = false,
+            initFileSize = this.initFileSize,
+            preallocate = false))
+       }
       0
     }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index fc306ee..570e380 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -3683,7 +3683,17 @@ class LogTest {
     assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head)
   }
 
- private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
+  @Test
+  def testLoadPartitionDirWithNoSegmentsShouldNotThrow() {
+    val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3))
+    val logDir = new File(tmpDir, dirName)
+    logDir.mkdirs()
+    val logConfig = LogTest.createLogConfig()
+    val log = createLog(logDir, logConfig)
+    assertEquals(1, log.numberOfSegments)
+  }
+
+  private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
 
   private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {
     var sequence = 0