You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/22 18:32:24 UTC

kafka git commit: KAFKA-5915; Support unmapping of mapped/direct buffers in Java 9

Repository: kafka
Updated Branches:
  refs/heads/trunk 125d8d6f7 -> e554dc518


KAFKA-5915; Support unmapping of mapped/direct buffers in Java 9

As mentioned in MappedByteBuffers' class documentation, its
implementation was inspired by Lucene's MMapDirectory:

https://github.com/apache/lucene-solr/blob/releases/lucene-solr/6.6.1/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java#L315

Without this change, unmapping fails with the following message:

> java.lang.IllegalAccessError: class kafka.log.AbstractIndex (in unnamed module 0x45103d6b) cannot access class jdk.internal.ref.Cleaner (in module java.base) because module java.base does not export jdk.internal.ref to unnamed module 0x45103d6b

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #3879 from ijuma/kafka-5915-unmap-mapped-buffers-java-9


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e554dc51
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e554dc51
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e554dc51

Branch: refs/heads/trunk
Commit: e554dc518eaaa0747899e708160275f95c4e525f
Parents: 125d8d6
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Sep 22 19:32:09 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 22 19:32:09 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/utils/MappedByteBuffers.java   | 136 +++++++++++++++++++
 .../common/utils/MappedByteBuffersTest.java     |  41 ++++++
 .../main/scala/kafka/log/AbstractIndex.scala    |  39 ++----
 .../scala/unit/kafka/log/OffsetIndexTest.scala  |  23 +++-
 4 files changed, 209 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java b/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
new file mode 100644
index 0000000..1faecb3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import static java.lang.invoke.MethodHandles.constant;
+import static java.lang.invoke.MethodHandles.dropArguments;
+import static java.lang.invoke.MethodHandles.filterReturnValue;
+import static java.lang.invoke.MethodHandles.guardWithTest;
+import static java.lang.invoke.MethodHandles.lookup;
+import static java.lang.invoke.MethodType.methodType;
+
+/**
+ * Utility methods for MappedByteBuffer implementations.
+ *
+ * The unmap implementation was inspired by the one in Lucene's MMapDirectory.
+ */
+public final class MappedByteBuffers {
+
+    private static final Logger log = LoggerFactory.getLogger(MappedByteBuffers.class);
+
+    // null if unmap is not supported
+    private static final MethodHandle UNMAP;
+
+    // null if unmap is supported
+    private static final RuntimeException UNMAP_NOT_SUPPORTED_EXCEPTION;
+
+    static {
+        Object unmap = null;
+        RuntimeException exception = null;
+        try {
+            unmap = lookupUnmapMethodHandle();
+        } catch (RuntimeException e) {
+            exception = e;
+        }
+        if (unmap != null) {
+            UNMAP = (MethodHandle) unmap;
+            UNMAP_NOT_SUPPORTED_EXCEPTION = null;
+        } else {
+            UNMAP = null;
+            UNMAP_NOT_SUPPORTED_EXCEPTION = exception;
+        }
+    }
+
+    private MappedByteBuffers() {}
+
+    public static void unmap(String resourceDescription, MappedByteBuffer buffer) throws IOException {
+        if (!buffer.isDirect())
+            throw new IllegalArgumentException("Unmapping only works with direct buffers");
+        if (UNMAP == null)
+            throw UNMAP_NOT_SUPPORTED_EXCEPTION;
+
+        try {
+            UNMAP.invokeExact((ByteBuffer) buffer);
+        } catch (Throwable throwable) {
+            throw new IOException("Unable to unmap the mapped buffer: " + resourceDescription, throwable);
+        }
+    }
+
+    private static MethodHandle lookupUnmapMethodHandle() {
+        final MethodHandles.Lookup lookup = lookup();
+        try {
+            if (Java.IS_JAVA9_COMPATIBLE)
+                return unmapJava9(lookup);
+            else
+                return unmapJava7Or8(lookup);
+        } catch (ReflectiveOperationException | RuntimeException e1) {
+            throw new UnsupportedOperationException("Unmapping is not supported on this platform, because internal " +
+                "Java APIs are not compatible with this Kafka version", e1);
+        }
+    }
+
+    private static MethodHandle unmapJava7Or8(MethodHandles.Lookup lookup) throws ReflectiveOperationException {
+        /* "Compile" a MethodHandle that is roughly equivalent to the following lambda:
+         *
+         * (ByteBuffer buffer) -> {
+         *   sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner();
+         *   if (nonNull(cleaner))
+         *     cleaner.clean();
+         *   else
+         *     noop(cleaner); // the noop is needed because MethodHandles#guardWithTest always needs both if and else
+         * }
+         */
+        Class<?> directBufferClass = Class.forName("java.nio.DirectByteBuffer");
+        Method m = directBufferClass.getMethod("cleaner");
+        m.setAccessible(true);
+        MethodHandle directBufferCleanerMethod = lookup.unreflect(m);
+        Class<?> cleanerClass = directBufferCleanerMethod.type().returnType();
+        MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class));
+        MethodHandle nonNullTest = lookup.findStatic(MappedByteBuffers.class, "nonNull",
+                methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass));
+        MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass);
+        MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop))
+                .asType(methodType(void.class, ByteBuffer.class));
+        return unmapper;
+    }
+
+    private static MethodHandle unmapJava9(MethodHandles.Lookup lookup) throws ReflectiveOperationException {
+        Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
+        MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner",
+                methodType(void.class, ByteBuffer.class));
+        Field f = unsafeClass.getDeclaredField("theUnsafe");
+        f.setAccessible(true);
+        Object theUnsafe = f.get(null);
+        return unmapper.bindTo(theUnsafe);
+    }
+
+    private static boolean nonNull(Object o) {
+        return o != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java b/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java
new file mode 100644
index 0000000..38fe9dd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class MappedByteBuffersTest {
+
+    /**
+     * Checks that unmap doesn't throw exceptions.
+     */
+    @Test
+    public void testUnmap() throws Exception {
+        File file = TestUtils.tempFile();
+        try (FileChannel channel = FileChannel.open(file.toPath())) {
+            MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_ONLY, 0, 0);
+            MappedByteBuffers.unmap(file.getAbsolutePath(), map);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 2d7cc7e..40ec870 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -25,8 +25,7 @@ import java.util.concurrent.locks.{Lock, ReentrantLock}
 import kafka.log.IndexSearchType.IndexSearchEntity
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.{CoreUtils, Logging}
-import org.apache.kafka.common.utils.{OperatingSystem, Utils}
-import sun.nio.ch.DirectBuffer
+import org.apache.kafka.common.utils.{MappedByteBuffers, OperatingSystem, Utils}
 
 import scala.math.ceil
 
@@ -109,7 +108,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
 
       /* Windows won't let us modify the file length while the file is mmapped :-( */
       if (OperatingSystem.IS_WINDOWS)
-        forceUnmap(mmap)
+        safeForceUnmap()
       try {
         raf.setLength(roundedNewSize)
         mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
@@ -150,10 +149,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
       // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
       // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
       // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
-      CoreUtils.swallow(forceUnmap(mmap))
-      // Accessing unmapped mmap crashes JVM by SEGV.
-      // Accessing it after this method called sounds like a bug but for safety, assign null and do not allow later access.
-      mmap = null
+      safeForceUnmap()
     }
     file.delete()
   }
@@ -178,11 +174,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
     trimToValidSize()
   }
 
-  def closeHandler() = {
-    // File handler of the index field will be closed after the mmap is garbage collected
-    CoreUtils.swallow(forceUnmap(mmap))
-    mmap = null
-  }
+  def closeHandler(): Unit = safeForceUnmap()
 
   /**
    * Do a basic sanity check on this index to detect obvious problems
@@ -202,22 +194,19 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
    */
   def truncateTo(offset: Long): Unit
 
+  protected def safeForceUnmap(): Unit = {
+    try forceUnmap()
+    catch {
+      case t: Throwable => error(s"Error unmapping index $file", t)
+    }
+  }
+
   /**
    * Forcefully free the buffer's mmap.
    */
-  protected def forceUnmap(m: MappedByteBuffer) {
-    try {
-      m match {
-        case buffer: DirectBuffer =>
-          val bufferCleaner = buffer.cleaner()
-          /* cleaner can be null if the mapped region has size 0 */
-          if (bufferCleaner != null)
-            bufferCleaner.clean()
-        case _ =>
-      }
-    } catch {
-      case t: Throwable => error("Error when freeing index buffer", t)
-    }
+  protected[log] def forceUnmap() {
+    try MappedByteBuffers.unmap(file.getAbsolutePath, mmap)
+    finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we null it out to be safe
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 506d99c..8fa3cc1 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -18,10 +18,14 @@
 package kafka.log
 
 import java.io._
+import java.nio.file.Files
+
 import org.junit.Assert._
-import java.util.{Collections, Arrays}
+import java.util.{Arrays, Collections}
+
 import org.junit._
 import org.scalatest.junit.JUnitSuite
+
 import scala.collection._
 import scala.util.Random
 import kafka.utils.TestUtils
@@ -34,7 +38,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Before
   def setup() {
-    this.idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
+    this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
   }
   
   @After
@@ -135,7 +139,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Test
   def truncate() {
-	val idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+	val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
 	idx.truncate()
     for(i <- 1 until 10)
       idx.append(i, i)
@@ -165,6 +169,14 @@ class OffsetIndexTest extends JUnitSuite {
     assertEquals("Full truncation should leave no entries", 0, idx.entries)
     idx.append(0, 0)
   }
+
+  @Test
+  def forceUnmapTest(): Unit = {
+    val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+    idx.forceUnmap()
+    // mmap should be null after unmap causing lookup to throw a NPE
+    intercept[NullPointerException](idx.lookup(1))
+  }
   
   def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) {
     try {
@@ -186,9 +198,10 @@ class OffsetIndexTest extends JUnitSuite {
     vals
   }
   
-  def nonExistantTempFile(): File = {
+  def nonExistentTempFile(): File = {
     val file = TestUtils.tempFile()
-    file.delete()
+    Files.delete(file.toPath)
     file
   }
+
 }