You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/22 18:32:24 UTC
kafka git commit: KAFKA-5915;
Support unmapping of mapped/direct buffers in Java 9
Repository: kafka
Updated Branches:
refs/heads/trunk 125d8d6f7 -> e554dc518
KAFKA-5915; Support unmapping of mapped/direct buffers in Java 9
As mentioned in MappedByteBuffers' class documentation, its
implementation was inspired by Lucene's MMapDirectory:
https://github.com/apache/lucene-solr/blob/releases/lucene-solr/6.6.1/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java#L315
Without this change, unmapping fails with the following message:
> java.lang.IllegalAccessError: class kafka.log.AbstractIndex (in unnamed module 0x45103d6b) cannot access class jdk.internal.ref.Cleaner (in module java.base) because module java.base does not export jdk.internal.ref to unnamed module 0x45103d6b
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #3879 from ijuma/kafka-5915-unmap-mapped-buffers-java-9
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e554dc51
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e554dc51
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e554dc51
Branch: refs/heads/trunk
Commit: e554dc518eaaa0747899e708160275f95c4e525f
Parents: 125d8d6
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Sep 22 19:32:09 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 22 19:32:09 2017 +0100
----------------------------------------------------------------------
.../kafka/common/utils/MappedByteBuffers.java | 136 +++++++++++++++++++
.../common/utils/MappedByteBuffersTest.java | 41 ++++++
.../main/scala/kafka/log/AbstractIndex.scala | 39 ++----
.../scala/unit/kafka/log/OffsetIndexTest.scala | 23 +++-
4 files changed, 209 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java b/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
new file mode 100644
index 0000000..1faecb3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import static java.lang.invoke.MethodHandles.constant;
+import static java.lang.invoke.MethodHandles.dropArguments;
+import static java.lang.invoke.MethodHandles.filterReturnValue;
+import static java.lang.invoke.MethodHandles.guardWithTest;
+import static java.lang.invoke.MethodHandles.lookup;
+import static java.lang.invoke.MethodType.methodType;
+
+/**
+ * Utility methods for MappedByteBuffer implementations.
+ *
+ * The unmap implementation was inspired by the one in Lucene's MMapDirectory.
+ */
+public final class MappedByteBuffers {
+
+ private static final Logger log = LoggerFactory.getLogger(MappedByteBuffers.class);
+
+ // null if unmap is not supported
+ private static final MethodHandle UNMAP;
+
+ // null if unmap is supported
+ private static final RuntimeException UNMAP_NOT_SUPPORTED_EXCEPTION;
+
+ static {
+ Object unmap = null;
+ RuntimeException exception = null;
+ try {
+ unmap = lookupUnmapMethodHandle();
+ } catch (RuntimeException e) {
+ exception = e;
+ }
+ if (unmap != null) {
+ UNMAP = (MethodHandle) unmap;
+ UNMAP_NOT_SUPPORTED_EXCEPTION = null;
+ } else {
+ UNMAP = null;
+ UNMAP_NOT_SUPPORTED_EXCEPTION = exception;
+ }
+ }
+
+ private MappedByteBuffers() {}
+
+ public static void unmap(String resourceDescription, MappedByteBuffer buffer) throws IOException {
+ if (!buffer.isDirect())
+ throw new IllegalArgumentException("Unmapping only works with direct buffers");
+ if (UNMAP == null)
+ throw UNMAP_NOT_SUPPORTED_EXCEPTION;
+
+ try {
+ UNMAP.invokeExact((ByteBuffer) buffer);
+ } catch (Throwable throwable) {
+ throw new IOException("Unable to unmap the mapped buffer: " + resourceDescription, throwable);
+ }
+ }
+
+ private static MethodHandle lookupUnmapMethodHandle() {
+ final MethodHandles.Lookup lookup = lookup();
+ try {
+ if (Java.IS_JAVA9_COMPATIBLE)
+ return unmapJava9(lookup);
+ else
+ return unmapJava7Or8(lookup);
+ } catch (ReflectiveOperationException | RuntimeException e1) {
+ throw new UnsupportedOperationException("Unmapping is not supported on this platform, because internal " +
+ "Java APIs are not compatible with this Kafka version", e1);
+ }
+ }
+
+ private static MethodHandle unmapJava7Or8(MethodHandles.Lookup lookup) throws ReflectiveOperationException {
+ /* "Compile" a MethodHandle that is roughly equivalent to the following lambda:
+ *
+ * (ByteBuffer buffer) -> {
+ * sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner();
+ * if (nonNull(cleaner))
+ * cleaner.clean();
+ * else
+ * noop(cleaner); // the noop is needed because MethodHandles#guardWithTest always needs both if and else
+ * }
+ */
+ Class<?> directBufferClass = Class.forName("java.nio.DirectByteBuffer");
+ Method m = directBufferClass.getMethod("cleaner");
+ m.setAccessible(true);
+ MethodHandle directBufferCleanerMethod = lookup.unreflect(m);
+ Class<?> cleanerClass = directBufferCleanerMethod.type().returnType();
+ MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class));
+ MethodHandle nonNullTest = lookup.findStatic(MappedByteBuffers.class, "nonNull",
+ methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass));
+ MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass);
+ MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop))
+ .asType(methodType(void.class, ByteBuffer.class));
+ return unmapper;
+ }
+
+ private static MethodHandle unmapJava9(MethodHandles.Lookup lookup) throws ReflectiveOperationException {
+ Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
+ MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner",
+ methodType(void.class, ByteBuffer.class));
+ Field f = unsafeClass.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ Object theUnsafe = f.get(null);
+ return unmapper.bindTo(theUnsafe);
+ }
+
+ private static boolean nonNull(Object o) {
+ return o != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java b/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java
new file mode 100644
index 0000000..38fe9dd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class MappedByteBuffersTest {
+
+ /**
+ * Checks that unmap doesn't throw exceptions.
+ */
+ @Test
+ public void testUnmap() throws Exception {
+ File file = TestUtils.tempFile();
+ try (FileChannel channel = FileChannel.open(file.toPath())) {
+ MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_ONLY, 0, 0);
+ MappedByteBuffers.unmap(file.getAbsolutePath(), map);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 2d7cc7e..40ec870 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -25,8 +25,7 @@ import java.util.concurrent.locks.{Lock, ReentrantLock}
import kafka.log.IndexSearchType.IndexSearchEntity
import kafka.utils.CoreUtils.inLock
import kafka.utils.{CoreUtils, Logging}
-import org.apache.kafka.common.utils.{OperatingSystem, Utils}
-import sun.nio.ch.DirectBuffer
+import org.apache.kafka.common.utils.{MappedByteBuffers, OperatingSystem, Utils}
import scala.math.ceil
@@ -109,7 +108,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
/* Windows won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS)
- forceUnmap(mmap)
+ safeForceUnmap()
try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
@@ -150,10 +149,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
- CoreUtils.swallow(forceUnmap(mmap))
- // Accessing unmapped mmap crashes JVM by SEGV.
- // Accessing it after this method called sounds like a bug but for safety, assign null and do not allow later access.
- mmap = null
+ safeForceUnmap()
}
file.delete()
}
@@ -178,11 +174,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
trimToValidSize()
}
- def closeHandler() = {
- // File handler of the index field will be closed after the mmap is garbage collected
- CoreUtils.swallow(forceUnmap(mmap))
- mmap = null
- }
+ def closeHandler(): Unit = safeForceUnmap()
/**
* Do a basic sanity check on this index to detect obvious problems
@@ -202,22 +194,19 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
*/
def truncateTo(offset: Long): Unit
+ protected def safeForceUnmap(): Unit = {
+ try forceUnmap()
+ catch {
+ case t: Throwable => error(s"Error unmapping index $file", t)
+ }
+ }
+
/**
* Forcefully free the buffer's mmap.
*/
- protected def forceUnmap(m: MappedByteBuffer) {
- try {
- m match {
- case buffer: DirectBuffer =>
- val bufferCleaner = buffer.cleaner()
- /* cleaner can be null if the mapped region has size 0 */
- if (bufferCleaner != null)
- bufferCleaner.clean()
- case _ =>
- }
- } catch {
- case t: Throwable => error("Error when freeing index buffer", t)
- }
+ protected[log] def forceUnmap() {
+ try MappedByteBuffers.unmap(file.getAbsolutePath, mmap)
+ finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we null it out to be safe
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 506d99c..8fa3cc1 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -18,10 +18,14 @@
package kafka.log
import java.io._
+import java.nio.file.Files
+
import org.junit.Assert._
-import java.util.{Collections, Arrays}
+import java.util.{Arrays, Collections}
+
import org.junit._
import org.scalatest.junit.JUnitSuite
+
import scala.collection._
import scala.util.Random
import kafka.utils.TestUtils
@@ -34,7 +38,7 @@ class OffsetIndexTest extends JUnitSuite {
@Before
def setup() {
- this.idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
+ this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
}
@After
@@ -135,7 +139,7 @@ class OffsetIndexTest extends JUnitSuite {
@Test
def truncate() {
- val idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+ val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
idx.truncate()
for(i <- 1 until 10)
idx.append(i, i)
@@ -165,6 +169,14 @@ class OffsetIndexTest extends JUnitSuite {
assertEquals("Full truncation should leave no entries", 0, idx.entries)
idx.append(0, 0)
}
+
+ @Test
+ def forceUnmapTest(): Unit = {
+ val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+ idx.forceUnmap()
+ // mmap should be null after unmap causing lookup to throw a NPE
+ intercept[NullPointerException](idx.lookup(1))
+ }
def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) {
try {
@@ -186,9 +198,10 @@ class OffsetIndexTest extends JUnitSuite {
vals
}
- def nonExistantTempFile(): File = {
+ def nonExistentTempFile(): File = {
val file = TestUtils.tempFile()
- file.delete()
+ Files.delete(file.toPath)
file
}
+
}