You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by fo...@apache.org on 2019/07/04 08:55:40 UTC
[parquet-mr] branch master updated: PARQUET-1550: CleanUtil does
not work in Java 11 (#654)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 0b909ab PARQUET-1550: CleanUtil does not work in Java 11 (#654)
0b909ab is described below
commit 0b909ab3ed9fd9411092de359d4217aa6c9abc21
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Thu Jul 4 10:55:34 2019 +0200
PARQUET-1550: CleanUtil does not work in Java 11 (#654)
CleanUtil does not work in Java 11
---
parquet-hadoop/pom.xml | 5 ++
.../org/apache/parquet/hadoop/codec/CleanUtil.java | 98 +++++++++++++++-------
.../parquet/hadoop/codec/SnappyCompressor.java | 8 +-
.../parquet/hadoop/codec/SnappyDecompressor.java | 12 +--
4 files changed, 85 insertions(+), 38 deletions(-)
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index ff2829d..d4fbf05 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -86,6 +86,11 @@
<version>1.6</version>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ <version>${fastutil.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.github.rdblue</groupId>
<artifactId>brotli-codec</artifactId>
<version>${brotli-codec.version}</version>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
index 8bf24b2..82b1414 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
@@ -30,41 +30,83 @@ import org.slf4j.LoggerFactory;
* A Helper class which use reflections to clean up DirectBuffer. It's implemented for
* better compatibility with both java8 and java9+, because the Cleaner class is moved to
* another place since java9+.
+ *
+ * Strongly inspired by:
+ * https://github.com/apache/tomcat/blob/master/java/org/apache/tomcat/util/buf/ByteBufferUtils.java
*/
-public class CleanUtil {
+public class CleanUtil
+{
private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class);
- private static final Field CLEANER_FIELD;
- private static final Method CLEAN_METHOD;
+
+ private static final Object unsafe;
+ private static final Method cleanerMethod;
+ private static final Method cleanMethod;
+ private static final Method invokeCleanerMethod;
+
+ private static final int majorVersion =
+ Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]);
static {
- ByteBuffer buf = null;
- Field cleanerField = null;
- Method cleanMethod = null;
- try {
- buf = ByteBuffer.allocateDirect(1);
- cleanerField = buf.getClass().getDeclaredField("cleaner");
- cleanerField.setAccessible(true);
- Object cleaner = cleanerField.get(buf);
- cleanMethod = cleaner.getClass().getDeclaredMethod("clean");
- } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) {
- logger.warn("Initialization failed for cleanerField or cleanMethod", e);
- } finally {
- clean(buf);
+ final ByteBuffer tempBuffer = ByteBuffer.allocateDirect(0);
+ Method cleanerMethodLocal = null;
+ Method cleanMethodLocal = null;
+ Object unsafeLocal = null;
+ Method invokeCleanerMethodLocal = null;
+ if (majorVersion >= 9) {
+ try {
+ final Class<?> clazz = Class.forName("sun.misc.Unsafe");
+ final Field theUnsafe = clazz.getDeclaredField("theUnsafe");
+ theUnsafe.setAccessible(true);
+ unsafeLocal = theUnsafe.get(null);
+ invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class);
+ invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer);
+ } catch (IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException | NoSuchMethodException | SecurityException
+ | ClassNotFoundException | NoSuchFieldException e) {
+ logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
+ unsafeLocal = null;
+ invokeCleanerMethodLocal = null;
+ }
+ } else {
+ try {
+ cleanerMethodLocal = tempBuffer.getClass().getMethod("cleaner");
+ cleanerMethodLocal.setAccessible(true);
+ final Object cleanerObject = cleanerMethodLocal.invoke(tempBuffer);
+ cleanMethodLocal = cleanerObject.getClass().getMethod("clean");
+ cleanMethodLocal.invoke(cleanerObject);
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException |
+ IllegalArgumentException | InvocationTargetException e) {
+ logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
+ cleanerMethodLocal = null;
+ cleanMethodLocal = null;
+ }
}
- CLEANER_FIELD = cleanerField;
- CLEAN_METHOD = cleanMethod;
+ cleanerMethod = cleanerMethodLocal;
+ cleanMethod = cleanMethodLocal;
+ unsafe = unsafeLocal;
+ invokeCleanerMethod = invokeCleanerMethodLocal;
}
- public static void clean(ByteBuffer buffer) {
- if (CLEANER_FIELD == null || CLEAN_METHOD == null) {
- return;
- }
- try {
- Object cleaner = CLEANER_FIELD.get(buffer);
- CLEAN_METHOD.invoke(cleaner);
- } catch (IllegalAccessException | InvocationTargetException | NullPointerException e) {
- // Ignore clean failure
- logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e);
+ private CleanUtil() {
+ // Hide the default constructor since this is a utility class.
+ }
+
+ public static void cleanDirectBuffer(ByteBuffer buf) {
+ if (cleanMethod != null) {
+ try {
+ cleanMethod.invoke(cleanerMethod.invoke(buf));
+ } catch (IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException | SecurityException e) {
+ logger.warn("Error while cleaning up the DirectBuffer", e);
+ }
+ } else if (invokeCleanerMethod != null) {
+ try {
+ invokeCleanerMethod.invoke(unsafe, buf);
+ } catch (IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException | SecurityException e) {
+ logger.warn("Error while cleaning up the DirectBuffer", e);
+ }
}
}
+
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
index 4720c08..1d2bf61 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -68,7 +68,7 @@ public class SnappyCompressor implements Compressor {
if (maxOutputSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
- CleanUtil.clean(oldBuffer);
+ CleanUtil.cleanDirectBuffer(oldBuffer);
}
// Reset the previous outputBuffer
outputBuffer.clear();
@@ -101,7 +101,7 @@ public class SnappyCompressor implements Compressor {
tmp.put(inputBuffer);
ByteBuffer oldBuffer = inputBuffer;
inputBuffer = tmp;
- CleanUtil.clean(oldBuffer);
+ CleanUtil.cleanDirectBuffer(oldBuffer);
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
@@ -113,8 +113,8 @@ public class SnappyCompressor implements Compressor {
@Override
public void end() {
- CleanUtil.clean(inputBuffer);
- CleanUtil.clean(outputBuffer);
+ CleanUtil.cleanDirectBuffer(inputBuffer);
+ CleanUtil.cleanDirectBuffer(outputBuffer);
}
@Override
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index c3da63f..2e0c558 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -63,7 +63,7 @@ public class SnappyDecompressor implements Decompressor {
if (decompressedSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
- CleanUtil.clean(oldBuffer);
+ CleanUtil.cleanDirectBuffer(oldBuffer);
}
// Reset the previous outputBuffer (i.e. set position to 0)
@@ -101,12 +101,12 @@ public class SnappyDecompressor implements Decompressor {
SnappyUtil.validateBuffer(buffer, off, len);
if (inputBuffer.capacity() - inputBuffer.position() < len) {
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
+ final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
inputBuffer.rewind();
newBuffer.put(inputBuffer);
- ByteBuffer oldBuffer = inputBuffer;
+ final ByteBuffer oldBuffer = inputBuffer;
inputBuffer = newBuffer;
- CleanUtil.clean(oldBuffer);
+ CleanUtil.cleanDirectBuffer(oldBuffer);
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
@@ -115,8 +115,8 @@ public class SnappyDecompressor implements Decompressor {
@Override
public void end() {
- CleanUtil.clean(inputBuffer);
- CleanUtil.clean(outputBuffer);
+ CleanUtil.cleanDirectBuffer(inputBuffer);
+ CleanUtil.cleanDirectBuffer(outputBuffer);
}
@Override