You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/07/29 03:49:39 UTC

[incubator-uniffle] branch master updated: [Improvement] ShuffleBlock should be release when finished reading (#74)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb39ed  [Improvement] ShuffleBlock should be release when finished reading (#74)
ccb39ed is described below

commit ccb39ed95fa7cd8742ce323eb7096ddd1a09827b
Author: xianjingfeng <58...@qq.com>
AuthorDate: Fri Jul 29 11:49:34 2022 +0800

    [Improvement] ShuffleBlock should be release when finished reading (#74)
    
    ### **What changes were proposed in this pull request?**
    release shuffleblock  when finished reading
    
    ### **Why are the changes needed?**
    We found spark executor is easy be killed by yarn, and i found it is because executor use too mush offheap memory when read shuffle data.
    I found most of offheap memory is used to store uncompressed shuffle Data, and this part of memory will be release only when GC is triggered
    
    ### **Does this PR introduce any user-facing change?**
    No
    
    ### **How was this patch tested?**
    Add new ut
---
 .../shuffle/reader/RssShuffleDataIterator.java     | 13 +++++++++-
 .../org/apache/uniffle/common/RssShuffleUtils.java | 28 ++++++++++++++++++++++
 .../uniffle/common/exception/RssException.java     |  4 ++++
 .../apache/uniffle/common/RssShuffleUtilsTest.java | 25 +++++++++++++++++++
 4 files changed, 69 insertions(+), 1 deletion(-)

diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 775285c..bd8184c 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -38,6 +38,7 @@ import scala.collection.Iterator;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.common.RssShuffleUtils;
+import org.apache.uniffle.common.exception.RssException;
 
 public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {
 
@@ -54,6 +55,7 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
   private DeserializationStream deserializationStream = null;
   private ByteBufInputStream byteBufInputStream = null;
   private long unCompressionLength = 0;
+  private ByteBuffer uncompressedData;
 
   public RssShuffleDataIterator(
       Serializer serializer,
@@ -106,8 +108,17 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
       shuffleReadMetrics.incFetchWaitTime(fetchDuration);
       if (compressedData != null) {
         shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position());
+        // Directbytebuffers are not collected in time will cause executor easy 
+        // be killed by cluster managers(such as YARN) for using too much offheap memory
+        if (uncompressedData != null && uncompressedData.isDirect()) {
+          try {
+            RssShuffleUtils.destroyDirectByteBuffer(uncompressedData);
+          } catch (Exception e) {
+            throw new RssException("Destroy DirectByteBuffer failed!", e);
+          }
+        }
         long startDecompress = System.currentTimeMillis();
-        ByteBuffer uncompressedData = RssShuffleUtils.decompressData(
+        uncompressedData = RssShuffleUtils.decompressData(
             compressedData, compressedBlock.getUncompressLength());
         unCompressionLength += compressedBlock.getUncompressLength();
         long decompressDuration = System.currentTimeMillis() - startDecompress;
diff --git a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
index 15b2c7c..58db058 100644
--- a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
@@ -17,8 +17,11 @@
 
 package org.apache.uniffle.common;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 
+import com.google.common.base.Preconditions;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4FastDecompressor;
@@ -56,4 +59,29 @@ public class RssShuffleUtils {
     fastDecompressor.decompress(data, data.position(), uncompressData, 0, uncompressLength);
     return uncompressData;
   }
+  
+  /**
+   * DirectByteBuffers are garbage collected by using a phantom reference and a
+   * reference queue. Every once a while, the JVM checks the reference queue and
+   * cleans the DirectByteBuffers. However, as this doesn't happen
+   * immediately after discarding all references to a DirectByteBuffer, it's
+   * easy to OutOfMemoryError yourself using DirectByteBuffers. This function
+   * explicitly calls the Cleaner method of a DirectByteBuffer.
+   *
+   * @param toBeDestroyed
+   *          The DirectByteBuffer that will be "cleaned". Utilizes reflection.
+   *
+   */
+  public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
+          throws IllegalArgumentException, IllegalAccessException,
+          InvocationTargetException, SecurityException, NoSuchMethodException {
+    Preconditions.checkArgument(toBeDestroyed.isDirect(),
+            "toBeDestroyed isn't direct!");
+    Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
+    cleanerMethod.setAccessible(true);
+    Object cleaner = cleanerMethod.invoke(toBeDestroyed);
+    Method cleanMethod = cleaner.getClass().getMethod("clean");
+    cleanMethod.setAccessible(true);
+    cleanMethod.invoke(cleaner);
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
index e5c4e40..93a7369 100644
--- a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
+++ b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
@@ -23,4 +23,8 @@ public class RssException extends RuntimeException {
   public RssException(String message) {
     super(message);
   }
+
+  public RssException(String message, Throwable e) {
+    super(message, e);
+  }
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
index c348816..781b2d5 100644
--- a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
@@ -20,10 +20,12 @@ package org.apache.uniffle.common;
 import java.nio.ByteBuffer;
 import org.apache.commons.lang3.RandomUtils;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RssShuffleUtilsTest {
 
@@ -46,4 +48,27 @@ public class RssShuffleUtilsTest {
     assertArrayEquals(data, buffer2);
   }
 
+  @Test
+  public void testDestroyDirectByteBuffer() throws Exception {
+    int size = 10;
+    byte b = 1;
+    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
+    for (int i = 0; i < size; i++) {
+      byteBuffer.put(b);
+    }
+    byteBuffer.flip();
+    RssShuffleUtils.destroyDirectByteBuffer(byteBuffer);
+    // The memory may not be released fast enough.
+    Thread.sleep(200);
+    boolean same = true;
+    byte[] read = new byte[size];
+    byteBuffer.get(read);
+    for (byte br : read) {
+      if (b != br) {
+        same = false;
+        break;
+      }
+    }
+    assertTrue(!same);
+  }
 }