You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/16 08:38:56 UTC
[incubator-celeborn] branch main updated: [CELEBORN-9] [ISSUE-861] Support multiple JDK version build (#974)
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a6e89f3b [CELEBORN-9] [ISSUE-861] Support multiple JDK version build (#974)
a6e89f3b is described below
commit a6e89f3b63733c3f35689871dbacec0c0c5bc290
Author: Gabriel <ga...@gmail.com>
AuthorDate: Wed Nov 16 16:38:51 2022 +0800
[CELEBORN-9] [ISSUE-861] Support multiple JDK version build (#974)
---
.../common/network/server/MemoryTracker.java | 31 +++++-
.../apache/celeborn/common/unsafe/Platform.java | 110 ++++++++++++++++++---
2 files changed, 125 insertions(+), 16 deletions(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java b/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java
index 3a1ccbbf..eb7aa986 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/MemoryTracker.java
@@ -18,6 +18,8 @@
package org.apache.celeborn.common.network.server;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -27,10 +29,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
+import com.google.common.base.Preconditions;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.misc.VM;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.util.ThreadUtils;
@@ -39,7 +41,7 @@ import org.apache.celeborn.common.util.Utils;
public class MemoryTracker {
private static final Logger logger = LoggerFactory.getLogger(MemoryTracker.class);
private static volatile MemoryTracker _INSTANCE = null;
- private final long maxDirectorMemory = VM.maxDirectMemory();
+ private long maxDirectorMemory = 0;
private final long pausePushDataThreshold;
private final long pauseReplicateThreshold;
private final long resumeThreshold;
@@ -101,6 +103,31 @@ public class MemoryTracker {
double maxSortMemRatio,
long checkInterval,
long reportInterval) {
+ String[][] providers =
+ new String[][] {
+ {"sun.misc.VM", "maxDirectMemory"},
+ {"jdk.internal.misc.VM", "maxDirectMemory"}
+ };
+
+ Method maxMemMethod = null;
+ for (String[] provider : providers) {
+ String clazz = provider[0];
+ String method = provider[1];
+ try {
+ Class<?> vmClass = Class.forName(clazz);
+ maxMemMethod = vmClass.getDeclaredMethod(method);
+
+ maxMemMethod.setAccessible(true);
+ maxDirectorMemory = (long) maxMemMethod.invoke(null);
+ break;
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | InvocationTargetException ignored) {
+ // Ignore Exception
+ }
+ }
+ Preconditions.checkArgument(maxDirectorMemory > 0);
maxSortMemory = ((long) (maxDirectorMemory * maxSortMemRatio));
pausePushDataThreshold = (long) (maxDirectorMemory * pausePushDataRatio);
pauseReplicateThreshold = (long) (maxDirectorMemory * pauseReplicateRatio);
diff --git a/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java b/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java
index 8d53a277..c8673c51 100644
--- a/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java
+++ b/common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java
@@ -19,10 +19,12 @@ package org.apache.celeborn.common.unsafe;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
-import sun.misc.Cleaner;
+import org.apache.commons.lang3.JavaVersion;
+import org.apache.commons.lang3.SystemUtils;
import sun.misc.Unsafe;
public final class Platform {
@@ -68,6 +70,73 @@ public final class Platform {
unaligned = _unaligned;
}
+ // Access fields and constructors once and store them, for performance:
+ private static final Constructor<?> DBB_CONSTRUCTOR;
+ private static final Field DBB_CLEANER_FIELD;
+ private static final Method CLEANER_CREATE_METHOD;
+
+ static {
+ // At the end of this block, CLEANER_CREATE_METHOD should be non-null iff it's possible to use
+ // reflection to invoke it, which is not necessarily possible by default in Java 9+.
+ // Code below can test for null to see whether to use it.
+
+ // The implementation of Cleaner changed from JDK 8 to 9
+ String cleanerClassName;
+ if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+ cleanerClassName = "jdk.internal.ref.Cleaner";
+ } else {
+ cleanerClassName = "sun.misc.Cleaner";
+ }
+ try {
+ Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
+ Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
+ Field cleanerField = cls.getDeclaredField("cleaner");
+ try {
+ constructor.setAccessible(true);
+ cleanerField.setAccessible(true);
+ } catch (RuntimeException re) {
+ // This is a Java 9+ exception, so needs to be handled without importing it
+ if ("InaccessibleObjectException".equals(re.getClass().getSimpleName())) {
+ // Continue, but the constructor/field are not available
+ // See comment below for more context
+ constructor = null;
+ cleanerField = null;
+ } else {
+ throw re;
+ }
+ }
+ // Have to set these values no matter what:
+ DBB_CONSTRUCTOR = constructor;
+ DBB_CLEANER_FIELD = cleanerField;
+
+ // no point continuing if the above failed:
+ if (DBB_CONSTRUCTOR != null && DBB_CLEANER_FIELD != null) {
+ Class<?> cleanerClass = Class.forName(cleanerClassName);
+ Method createMethod = cleanerClass.getMethod("create", Object.class, Runnable.class);
+ // Accessing jdk.internal.ref.Cleaner should actually fail by default in JDK 9+,
+ // unfortunately, unless the user has allowed access with something like
+ // --add-opens java.base/java.lang=ALL-UNNAMED If not, we can't really use the Cleaner
+ // hack below. It doesn't break, just means the user might run into the default JVM limit
+ // on off-heap memory and increase it or set the flag above. This tests whether it's
+ // available:
+ try {
+ createMethod.invoke(null, null, null);
+ } catch (IllegalAccessException e) {
+ // Don't throw an exception, but can't log here?
+ createMethod = null;
+ }
+ CLEANER_CREATE_METHOD = createMethod;
+ } else {
+ CLEANER_CREATE_METHOD = null;
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) {
+ // These are all fatal in any Java version - rethrow (have to wrap as this is a static block)
+ throw new IllegalStateException(e);
+ } catch (InvocationTargetException ite) {
+ throw new IllegalStateException(ite.getCause());
+ }
+ }
+
/**
* @return true when running JVM is having sun's Unsafe package available in it and underlying
* system having unaligned-access capability.
@@ -155,23 +224,36 @@ public final class Platform {
return newMemory;
}
- /**
- * Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
- * MaxDirectMemorySize limit (the default limit is too low and we do not want to require users to
- * increase it).
- */
+ /** Allocate a DirectByteBuffer, potentially bypassing the JVM's MaxDirectMemorySize limit. */
@SuppressWarnings("unchecked")
public static ByteBuffer allocateDirectBuffer(int size) {
try {
- Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
- Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
- constructor.setAccessible(true);
- Field cleanerField = cls.getDeclaredField("cleaner");
- cleanerField.setAccessible(true);
+ if (CLEANER_CREATE_METHOD == null) {
+ // Can't set a Cleaner (see comments on field), so need to allocate via normal Java APIs
+ try {
+ return ByteBuffer.allocateDirect(size);
+ } catch (OutOfMemoryError oome) {
+ // checkstyle.off: RegexpSinglelineJava
+ throw new OutOfMemoryError(
+ "Failed to allocate direct buffer ("
+ + oome.getMessage()
+ + "); try increasing -XX:MaxDirectMemorySize=... to, for example, your heap size");
+ // checkstyle.on: RegexpSinglelineJava
+ }
+ }
+ // Otherwise, use internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
+ // MaxDirectMemorySize limit (the default limit is too low and we do not want to
+ // require users to increase it).
long memory = allocateMemory(size);
- ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
- Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
- cleanerField.set(buffer, cleaner);
+ ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
+ try {
+ DBB_CLEANER_FIELD.set(
+ buffer,
+ CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ freeMemory(memory);
+ throw new IllegalStateException(e);
+ }
return buffer;
} catch (Exception e) {
throwException(e);