You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ya...@apache.org on 2022/09/06 09:58:22 UTC

[incubator-uniffle] 01/01: fix

This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch uniffle-144
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 912b9293e1bef0bba32e8c6758eb08ce972f59d9
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Tue Sep 6 17:58:02 2022 +0800

    fix
---
 .../org/apache/uniffle/common/RssShuffleUtils.java | 13 ++------
 .../apache/uniffle/common/RssShuffleUtilsTest.java | 37 ++++++++++++++++++----
 2 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
index 58db058e..c8c96bfd 100644
--- a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
@@ -17,8 +17,6 @@
 
 package org.apache.uniffle.common;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 
 import com.google.common.base.Preconditions;
@@ -27,6 +25,7 @@ import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4FastDecompressor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.nio.ch.DirectBuffer;
 
 public class RssShuffleUtils {
 
@@ -73,15 +72,9 @@ public class RssShuffleUtils {
    *
    */
   public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
-          throws IllegalArgumentException, IllegalAccessException,
-          InvocationTargetException, SecurityException, NoSuchMethodException {
+          throws IllegalArgumentException, SecurityException {
     Preconditions.checkArgument(toBeDestroyed.isDirect(),
             "toBeDestroyed isn't direct!");
-    Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
-    cleanerMethod.setAccessible(true);
-    Object cleaner = cleanerMethod.invoke(toBeDestroyed);
-    Method cleanMethod = cleaner.getClass().getMethod("clean");
-    cleanMethod.setAccessible(true);
-    cleanMethod.invoke(cleaner);
+    ((DirectBuffer)toBeDestroyed).cleaner().clean();
   }
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
index 1dd51ebf..06baa874 100644
--- a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
@@ -17,17 +17,20 @@
 
 package org.apache.uniffle.common;
 
+import java.lang.reflect.Field;
+import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
+import sun.misc.Unsafe;
+
 import org.apache.commons.lang3.RandomUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.*;
 
-public class RssShuffleUtilsTest {
+  public class RssShuffleUtilsTest {
 
   @ParameterizedTest
   @ValueSource(ints = {1, 1024, 128 * 1024, 512 * 1024, 1024 * 1024, 4 * 1024 * 1024})
@@ -51,15 +54,25 @@ public class RssShuffleUtilsTest {
   @Test
   public void testDestroyDirectByteBuffer() throws Exception {
     int size = 10;
-    byte b = 1;
+    byte b = 2;
     ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
     for (int i = 0; i < size; i++) {
       byteBuffer.put(b);
     }
     byteBuffer.flip();
+
+    // Get valid native pointer through `address` in `DirectByteBuffer`
+    Unsafe unsafe = getUnsafe();
+    long addressInByteBuffer = address(byteBuffer);
+    long originalAddress = unsafe.getAddress(addressInByteBuffer);
+
     RssShuffleUtils.destroyDirectByteBuffer(byteBuffer);
+
     // The memory may not be released fast enough.
-    Thread.sleep(200);
+    // If native pointer changes, `address` in `DirectByteBuffer` is invalid
+    while (unsafe.getAddress(addressInByteBuffer) == originalAddress) {
+      Thread.sleep(200);
+    }
     boolean same = true;
     byte[] read = new byte[size];
     byteBuffer.get(read);
@@ -69,6 +82,18 @@ public class RssShuffleUtilsTest {
         break;
       }
     }
-    assertTrue(!same);
+    assertFalse(same);
+  }
+
+  private Unsafe getUnsafe() throws NoSuchFieldException, IllegalAccessException {
+    Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
+    unsafeField.setAccessible(true);
+    return (sun.misc.Unsafe) unsafeField.get(null);
+  }
+
+  private long address(ByteBuffer buffer) throws NoSuchFieldException, IllegalAccessException {
+    Field addressField = Buffer.class.getDeclaredField("address");
+    addressField.setAccessible(true);
+    return (long) addressField.get(buffer);
   }
 }