You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by fo...@apache.org on 2019/07/04 08:55:40 UTC

[parquet-mr] branch master updated: PARQUET-1550: CleanUtil does not work in Java 11 (#654)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b909ab  PARQUET-1550: CleanUtil does not work in Java 11 (#654)
0b909ab is described below

commit 0b909ab3ed9fd9411092de359d4217aa6c9abc21
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Thu Jul 4 10:55:34 2019 +0200

    PARQUET-1550: CleanUtil does not work in Java 11 (#654)
    
    CleanUtil does not work in Java 11
---
 parquet-hadoop/pom.xml                             |  5 ++
 .../org/apache/parquet/hadoop/codec/CleanUtil.java | 98 +++++++++++++++-------
 .../parquet/hadoop/codec/SnappyCompressor.java     |  8 +-
 .../parquet/hadoop/codec/SnappyDecompressor.java   | 12 +--
 4 files changed, 85 insertions(+), 38 deletions(-)

diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index ff2829d..d4fbf05 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -86,6 +86,11 @@
       <version>1.6</version>
     </dependency>
     <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>${fastutil.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.github.rdblue</groupId>
       <artifactId>brotli-codec</artifactId>
       <version>${brotli-codec.version}</version>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
index 8bf24b2..82b1414 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
@@ -30,41 +30,83 @@ import org.slf4j.LoggerFactory;
  * A Helper class which use reflections to clean up DirectBuffer. It's implemented for
  * better compatibility with both java8 and java9+, because the Cleaner class is moved to
  * another place since java9+.
+ *
+ * Strongly inspired by:
+ * https://github.com/apache/tomcat/blob/master/java/org/apache/tomcat/util/buf/ByteBufferUtils.java
  */
-public class CleanUtil {
+public class CleanUtil
+{
   private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class);
-  private static final Field CLEANER_FIELD;
-  private static final Method CLEAN_METHOD;
+
+  private static final Object unsafe;
+  private static final Method cleanerMethod;
+  private static final Method cleanMethod;
+  private static final Method invokeCleanerMethod;
+
+  private static final int majorVersion =
+    Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]);
 
   static {
-    ByteBuffer buf = null;
-    Field cleanerField = null;
-    Method cleanMethod = null;
-    try {
-      buf = ByteBuffer.allocateDirect(1);
-      cleanerField = buf.getClass().getDeclaredField("cleaner");
-      cleanerField.setAccessible(true);
-      Object cleaner = cleanerField.get(buf);
-      cleanMethod = cleaner.getClass().getDeclaredMethod("clean");
-    } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) {
-      logger.warn("Initialization failed for cleanerField or cleanMethod", e);
-    } finally {
-      clean(buf);
+    final ByteBuffer tempBuffer = ByteBuffer.allocateDirect(0);
+    Method cleanerMethodLocal = null;
+    Method cleanMethodLocal = null;
+    Object unsafeLocal = null;
+    Method invokeCleanerMethodLocal = null;
+    if (majorVersion >= 9) {
+      try {
+        final Class<?> clazz = Class.forName("sun.misc.Unsafe");
+        final Field theUnsafe = clazz.getDeclaredField("theUnsafe");
+        theUnsafe.setAccessible(true);
+        unsafeLocal = theUnsafe.get(null);
+        invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class);
+        invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer);
+      } catch (IllegalAccessException | IllegalArgumentException
+        | InvocationTargetException | NoSuchMethodException | SecurityException
+        | ClassNotFoundException | NoSuchFieldException e) {
+        logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
+        unsafeLocal = null;
+        invokeCleanerMethodLocal = null;
+      }
+    } else {
+      try {
+        cleanerMethodLocal = tempBuffer.getClass().getMethod("cleaner");
+        cleanerMethodLocal.setAccessible(true);
+        final Object cleanerObject = cleanerMethodLocal.invoke(tempBuffer);
+        cleanMethodLocal = cleanerObject.getClass().getMethod("clean");
+        cleanMethodLocal.invoke(cleanerObject);
+      } catch (NoSuchMethodException | SecurityException | IllegalAccessException |
+        IllegalArgumentException | InvocationTargetException e) {
+        logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
+        cleanerMethodLocal = null;
+        cleanMethodLocal = null;
+      }
     }
-    CLEANER_FIELD = cleanerField;
-    CLEAN_METHOD = cleanMethod;
+    cleanerMethod = cleanerMethodLocal;
+    cleanMethod = cleanMethodLocal;
+    unsafe = unsafeLocal;
+    invokeCleanerMethod = invokeCleanerMethodLocal;
   }
 
-  public static void clean(ByteBuffer buffer) {
-    if (CLEANER_FIELD == null || CLEAN_METHOD == null) {
-      return;
-    }
-    try {
-      Object cleaner = CLEANER_FIELD.get(buffer);
-      CLEAN_METHOD.invoke(cleaner);
-    } catch (IllegalAccessException | InvocationTargetException | NullPointerException e) {
-      // Ignore clean failure
-      logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e);
+  private CleanUtil() {
+    // Hide the default constructor since this is a utility class.
+  }
+
+  public static void cleanDirectBuffer(ByteBuffer buf) {
+    if (cleanMethod != null) {
+      try {
+        cleanMethod.invoke(cleanerMethod.invoke(buf));
+      } catch (IllegalAccessException | IllegalArgumentException
+        | InvocationTargetException | SecurityException e) {
+        logger.warn("Error while cleaning up the DirectBuffer", e);
+      }
+    } else if (invokeCleanerMethod != null) {
+      try {
+        invokeCleanerMethod.invoke(unsafe, buf);
+      } catch (IllegalAccessException | IllegalArgumentException
+        | InvocationTargetException | SecurityException e) {
+        logger.warn("Error while cleaning up the DirectBuffer", e);
+      }
     }
   }
+
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
index 4720c08..1d2bf61 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -68,7 +68,7 @@ public class SnappyCompressor implements Compressor {
       if (maxOutputSize > outputBuffer.capacity()) {
         ByteBuffer oldBuffer = outputBuffer;
         outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
-        CleanUtil.clean(oldBuffer);
+        CleanUtil.cleanDirectBuffer(oldBuffer);
       }
       // Reset the previous outputBuffer
       outputBuffer.clear();
@@ -101,7 +101,7 @@ public class SnappyCompressor implements Compressor {
       tmp.put(inputBuffer);
       ByteBuffer oldBuffer = inputBuffer;
       inputBuffer = tmp;
-      CleanUtil.clean(oldBuffer);
+      CleanUtil.cleanDirectBuffer(oldBuffer);
     } else {
       inputBuffer.limit(inputBuffer.position() + len);
     }
@@ -113,8 +113,8 @@ public class SnappyCompressor implements Compressor {
 
   @Override
   public void end() {
-    CleanUtil.clean(inputBuffer);
-    CleanUtil.clean(outputBuffer);
+    CleanUtil.cleanDirectBuffer(inputBuffer);
+    CleanUtil.cleanDirectBuffer(outputBuffer);
   }
 
   @Override
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index c3da63f..2e0c558 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -63,7 +63,7 @@ public class SnappyDecompressor implements Decompressor {
       if (decompressedSize > outputBuffer.capacity()) {
         ByteBuffer oldBuffer = outputBuffer;
         outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
-        CleanUtil.clean(oldBuffer);
+        CleanUtil.cleanDirectBuffer(oldBuffer);
       }
 
       // Reset the previous outputBuffer (i.e. set position to 0)
@@ -101,12 +101,12 @@ public class SnappyDecompressor implements Decompressor {
     SnappyUtil.validateBuffer(buffer, off, len);
 
     if (inputBuffer.capacity() - inputBuffer.position() < len) {
-      ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
+      final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
       inputBuffer.rewind();
       newBuffer.put(inputBuffer);
-      ByteBuffer oldBuffer = inputBuffer;
+      final ByteBuffer oldBuffer = inputBuffer;
       inputBuffer = newBuffer;
-      CleanUtil.clean(oldBuffer);
+      CleanUtil.cleanDirectBuffer(oldBuffer);
     } else {
       inputBuffer.limit(inputBuffer.position() + len);
     }
@@ -115,8 +115,8 @@ public class SnappyDecompressor implements Decompressor {
 
   @Override
   public void end() {
-    CleanUtil.clean(inputBuffer);
-    CleanUtil.clean(outputBuffer);
+    CleanUtil.cleanDirectBuffer(inputBuffer);
+    CleanUtil.cleanDirectBuffer(outputBuffer);
   }
 
   @Override