You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/07/06 18:35:05 UTC

[kafka] branch trunk updated: KAFKA-8398: Prevent NPE in `forceUnmap` (#8983)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a24da3  KAFKA-8398: Prevent NPE in `forceUnmap` (#8983)
8a24da3 is described below

commit 8a24da376b801c6eb6522ad4861b83f5beb5826c
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon Jul 6 11:34:36 2020 -0700

    KAFKA-8398: Prevent NPE in `forceUnmap` (#8983)
    
    Without this change, we would catch the NPE and log it.
    This was misleading and could cause excessive log
    volume.
    
    The NPE can happen after `AlterReplicaLogDirs` completes
    successfully and when unmapping older regions. Example
    stacktrace:
    
    ```text
    [2019-05-20 14:08:13,999] ERROR Error unmapping index /tmp/kafka-logs/test-0.567a0d8ff88b45ab95794020d0b2e66f-delete/00000000000000000000.index (kafka.log.OffsetIndex)
    java.lang.NullPointerException
    at org.apache.kafka.common.utils.MappedByteBuffers.unmap(MappedByteBuffers.java:73)
    at kafka.log.AbstractIndex.forceUnmap(AbstractIndex.scala:318)
    at kafka.log.AbstractIndex.safeForceUnmap(AbstractIndex.scala:308)
    at kafka.log.AbstractIndex.$anonfun$closeHandler$1(AbstractIndex.scala:257)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at kafka.log.AbstractIndex.closeHandler(AbstractIndex.scala:257)
    at kafka.log.AbstractIndex.deleteIfExists(AbstractIndex.scala:226)
    at kafka.log.LogSegment.$anonfun$deleteIfExists$6(LogSegment.scala:597)
    at kafka.log.LogSegment.delete$1(LogSegment.scala:585)
    at kafka.log.LogSegment.$anonfun$deleteIfExists$5(LogSegment.scala:597)
    at kafka.utils.CoreUtils$.$anonfun$tryAll$1(CoreUtils.scala:115)
    at kafka.utils.CoreUtils$.$anonfun$tryAll$1$adapted(CoreUtils.scala:114)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at kafka.utils.CoreUtils$.tryAll(CoreUtils.scala:114)
    at kafka.log.LogSegment.deleteIfExists(LogSegment.scala:599)
    at kafka.log.Log.$anonfun$delete$3(Log.scala:1762)
    at kafka.log.Log.$anonfun$delete$3$adapted(Log.scala:1762)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at kafka.log.Log.$anonfun$delete$2(Log.scala:1762)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at kafka.log.Log.maybeHandleIOException(Log.scala:2013)
    at kafka.log.Log.delete(Log.scala:1759)
    at kafka.log.LogManager.deleteLogs(LogManager.scala:761)
    at kafka.log.LogManager.$anonfun$deleteLogs$6(LogManager.scala:775)
    at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    ```
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
    
    Co-authored-by: Jaikiran Pai <ja...@gmail.com>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index eb0906a..8b6110e 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -311,9 +311,11 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset:
   }
 
   protected def safeForceUnmap(): Unit = {
-    try forceUnmap()
-    catch {
-      case t: Throwable => error(s"Error unmapping index $file", t)
+    if (mmap != null) {
+      try forceUnmap()
+      catch {
+        case t: Throwable => error(s"Error unmapping index $file", t)
+      }
     }
   }