You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/16 08:38:56 UTC

[incubator-celeborn] branch main updated: [CELEBORN-9] [ISSUE-861] Support multiple JDK version build (#974)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a6e89f3b [CELEBORN-9] [ISSUE-861] Support multiple JDK version build (#974)
a6e89f3b is described below

commit a6e89f3b63733c3f35689871dbacec0c0c5bc290
Author: Gabriel <ga...@gmail.com>
AuthorDate: Wed Nov 16 16:38:51 2022 +0800

    [CELEBORN-9] [ISSUE-861] Support multiple JDK version build (#974)
---
 .../common/network/server/MemoryTracker.java       |  31 +++++-
 .../apache/celeborn/common/unsafe/Platform.java    | 110 ++++++++++++++++++---
 2 files changed, 125 insertions(+), 16 deletions(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java b/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java
index 3a1ccbbf..eb7aa986 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java
@@ -18,6 +18,8 @@
 package org.apache.celeborn.common.network.server;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -27,10 +29,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
+import com.google.common.base.Preconditions;
 import io.netty.util.internal.PlatformDependent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.misc.VM;
 
 import org.apache.celeborn.common.protocol.TransportModuleConstants;
 import org.apache.celeborn.common.util.ThreadUtils;
@@ -39,7 +41,7 @@ import org.apache.celeborn.common.util.Utils;
 public class MemoryTracker {
   private static final Logger logger = LoggerFactory.getLogger(MemoryTracker.class);
   private static volatile MemoryTracker _INSTANCE = null;
-  private final long maxDirectorMemory = VM.maxDirectMemory();
+  private long maxDirectorMemory = 0;
   private final long pausePushDataThreshold;
   private final long pauseReplicateThreshold;
   private final long resumeThreshold;
@@ -101,6 +103,31 @@ public class MemoryTracker {
       double maxSortMemRatio,
       long checkInterval,
       long reportInterval) {
+    String[][] providers =
+        new String[][] {
+          {"sun.misc.VM", "maxDirectMemory"},
+          {"jdk.internal.misc.VM", "maxDirectMemory"}
+        };
+
+    Method maxMemMethod = null;
+    for (String[] provider : providers) {
+      String clazz = provider[0];
+      String method = provider[1];
+      try {
+        Class<?> vmClass = Class.forName(clazz);
+        maxMemMethod = vmClass.getDeclaredMethod(method);
+
+        maxMemMethod.setAccessible(true);
+        maxDirectorMemory = (long) maxMemMethod.invoke(null);
+        break;
+      } catch (ClassNotFoundException
+          | NoSuchMethodException
+          | IllegalAccessException
+          | InvocationTargetException ignored) {
+        // Ignore Exception
+      }
+    }
+    Preconditions.checkArgument(maxDirectorMemory > 0);
     maxSortMemory = ((long) (maxDirectorMemory * maxSortMemRatio));
     pausePushDataThreshold = (long) (maxDirectorMemory * pausePushDataRatio);
     pauseReplicateThreshold = (long) (maxDirectorMemory * pauseReplicateRatio);
diff --git a/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java b/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java
index 8d53a277..c8673c51 100644
--- a/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java
+++ b/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java
@@ -19,10 +19,12 @@ package org.apache.celeborn.common.unsafe;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 
-import sun.misc.Cleaner;
+import org.apache.commons.lang3.JavaVersion;
+import org.apache.commons.lang3.SystemUtils;
 import sun.misc.Unsafe;
 
 public final class Platform {
@@ -68,6 +70,73 @@ public final class Platform {
     unaligned = _unaligned;
   }
 
+  // Access fields and constructors once and store them, for performance:
+  private static final Constructor<?> DBB_CONSTRUCTOR;
+  private static final Field DBB_CLEANER_FIELD;
+  private static final Method CLEANER_CREATE_METHOD;
+
+  static {
+    // At the end of this block, CLEANER_CREATE_METHOD should be non-null iff it's possible to use
+    // reflection to invoke it, which is not necessarily possible by default in Java 9+.
+    // Code below can test for null to see whether to use it.
+
+    // The implementation of Cleaner changed from JDK 8 to 9
+    String cleanerClassName;
+    if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+      cleanerClassName = "jdk.internal.ref.Cleaner";
+    } else {
+      cleanerClassName = "sun.misc.Cleaner";
+    }
+    try {
+      Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
+      Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
+      Field cleanerField = cls.getDeclaredField("cleaner");
+      try {
+        constructor.setAccessible(true);
+        cleanerField.setAccessible(true);
+      } catch (RuntimeException re) {
+        // This is a Java 9+ exception, so needs to be handled without importing it
+        if ("InaccessibleObjectException".equals(re.getClass().getSimpleName())) {
+          // Continue, but the constructor/field are not available
+          // See comment below for more context
+          constructor = null;
+          cleanerField = null;
+        } else {
+          throw re;
+        }
+      }
+      // Have to set these values no matter what:
+      DBB_CONSTRUCTOR = constructor;
+      DBB_CLEANER_FIELD = cleanerField;
+
+      // no point continuing if the above failed:
+      if (DBB_CONSTRUCTOR != null && DBB_CLEANER_FIELD != null) {
+        Class<?> cleanerClass = Class.forName(cleanerClassName);
+        Method createMethod = cleanerClass.getMethod("create", Object.class, Runnable.class);
+        // Accessing jdk.internal.ref.Cleaner should actually fail by default in JDK 9+,
+        // unfortunately, unless the user has allowed access with something like
+        // --add-opens java.base/java.lang=ALL-UNNAMED  If not, we can't really use the Cleaner
+        // hack below. It doesn't break, just means the user might run into the default JVM limit
+        // on off-heap memory and increase it or set the flag above. This tests whether it's
+        // available:
+        try {
+          createMethod.invoke(null, null, null);
+        } catch (IllegalAccessException e) {
+          // Don't throw an exception, but can't log here?
+          createMethod = null;
+        }
+        CLEANER_CREATE_METHOD = createMethod;
+      } else {
+        CLEANER_CREATE_METHOD = null;
+      }
+    } catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) {
+      // These are all fatal in any Java version - rethrow (have to wrap as this is a static block)
+      throw new IllegalStateException(e);
+    } catch (InvocationTargetException ite) {
+      throw new IllegalStateException(ite.getCause());
+    }
+  }
+
   /**
    * @return true when running JVM is having sun's Unsafe package available in it and underlying
    *     system having unaligned-access capability.
@@ -155,23 +224,36 @@ public final class Platform {
     return newMemory;
   }
 
-  /**
-   * Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
-   * MaxDirectMemorySize limit (the default limit is too low and we do not want to require users to
-   * increase it).
-   */
+  /** Allocate a DirectByteBuffer, potentially bypassing the JVM's MaxDirectMemorySize limit. */
   @SuppressWarnings("unchecked")
   public static ByteBuffer allocateDirectBuffer(int size) {
     try {
-      Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
-      Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
-      constructor.setAccessible(true);
-      Field cleanerField = cls.getDeclaredField("cleaner");
-      cleanerField.setAccessible(true);
+      if (CLEANER_CREATE_METHOD == null) {
+        // Can't set a Cleaner (see comments on field), so need to allocate via normal Java APIs
+        try {
+          return ByteBuffer.allocateDirect(size);
+        } catch (OutOfMemoryError oome) {
+          // checkstyle.off: RegexpSinglelineJava
+          throw new OutOfMemoryError(
+              "Failed to allocate direct buffer ("
+                  + oome.getMessage()
+                  + "); try increasing -XX:MaxDirectMemorySize=... to, for example, your heap size");
+          // checkstyle.on: RegexpSinglelineJava
+        }
+      }
+      // Otherwise, use internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
+      // MaxDirectMemorySize limit (the default limit is too low and we do not want to
+      // require users to increase it).
       long memory = allocateMemory(size);
-      ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
-      Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
-      cleanerField.set(buffer, cleaner);
+      ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
+      try {
+        DBB_CLEANER_FIELD.set(
+            buffer,
+            CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        freeMemory(memory);
+        throw new IllegalStateException(e);
+      }
       return buffer;
     } catch (Exception e) {
       throwException(e);