You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/07/29 03:49:39 UTC
[incubator-uniffle] branch master updated: [Improvement] ShuffleBlock should be release when finished reading (#74)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ccb39ed [Improvement] ShuffleBlock should be release when finished reading (#74)
ccb39ed is described below
commit ccb39ed95fa7cd8742ce323eb7096ddd1a09827b
Author: xianjingfeng <58...@qq.com>
AuthorDate: Fri Jul 29 11:49:34 2022 +0800
[Improvement] ShuffleBlock should be release when finished reading (#74)
### **What changes were proposed in this pull request?**
release shuffleblock when finished reading
### **Why are the changes needed?**
We found spark executor is easy be killed by yarn, and i found it is because executor use too mush offheap memory when read shuffle data.
I found most of offheap memory is used to store uncompressed shuffle Data, and this part of memory will be release only when GC is triggered
### **Does this PR introduce any user-facing change?**
No
### **How was this patch tested?**
Add new ut
---
.../shuffle/reader/RssShuffleDataIterator.java | 13 +++++++++-
.../org/apache/uniffle/common/RssShuffleUtils.java | 28 ++++++++++++++++++++++
.../uniffle/common/exception/RssException.java | 4 ++++
.../apache/uniffle/common/RssShuffleUtilsTest.java | 25 +++++++++++++++++++
4 files changed, 69 insertions(+), 1 deletion(-)
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 775285c..bd8184c 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -38,6 +38,7 @@ import scala.collection.Iterator;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.RssShuffleUtils;
+import org.apache.uniffle.common.exception.RssException;
public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {
@@ -54,6 +55,7 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
private DeserializationStream deserializationStream = null;
private ByteBufInputStream byteBufInputStream = null;
private long unCompressionLength = 0;
+ private ByteBuffer uncompressedData;
public RssShuffleDataIterator(
Serializer serializer,
@@ -106,8 +108,17 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (compressedData != null) {
shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position());
+ // Directbytebuffers are not collected in time will cause executor easy
+ // be killed by cluster managers(such as YARN) for using too much offheap memory
+ if (uncompressedData != null && uncompressedData.isDirect()) {
+ try {
+ RssShuffleUtils.destroyDirectByteBuffer(uncompressedData);
+ } catch (Exception e) {
+ throw new RssException("Destroy DirectByteBuffer failed!", e);
+ }
+ }
long startDecompress = System.currentTimeMillis();
- ByteBuffer uncompressedData = RssShuffleUtils.decompressData(
+ uncompressedData = RssShuffleUtils.decompressData(
compressedData, compressedBlock.getUncompressLength());
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
diff --git a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
index 15b2c7c..58db058 100644
--- a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java
@@ -17,8 +17,11 @@
package org.apache.uniffle.common;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
+import com.google.common.base.Preconditions;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
@@ -56,4 +59,29 @@ public class RssShuffleUtils {
fastDecompressor.decompress(data, data.position(), uncompressData, 0, uncompressLength);
return uncompressData;
}
+
+ /**
+ * DirectByteBuffers are garbage collected by using a phantom reference and a
+ * reference queue. Every once a while, the JVM checks the reference queue and
+ * cleans the DirectByteBuffers. However, as this doesn't happen
+ * immediately after discarding all references to a DirectByteBuffer, it's
+ * easy to OutOfMemoryError yourself using DirectByteBuffers. This function
+ * explicitly calls the Cleaner method of a DirectByteBuffer.
+ *
+ * @param toBeDestroyed
+ * The DirectByteBuffer that will be "cleaned". Utilizes reflection.
+ *
+ */
+ public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
+ throws IllegalArgumentException, IllegalAccessException,
+ InvocationTargetException, SecurityException, NoSuchMethodException {
+ Preconditions.checkArgument(toBeDestroyed.isDirect(),
+ "toBeDestroyed isn't direct!");
+ Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
+ cleanerMethod.setAccessible(true);
+ Object cleaner = cleanerMethod.invoke(toBeDestroyed);
+ Method cleanMethod = cleaner.getClass().getMethod("clean");
+ cleanMethod.setAccessible(true);
+ cleanMethod.invoke(cleaner);
+ }
}
diff --git a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
index e5c4e40..93a7369 100644
--- a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
+++ b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
@@ -23,4 +23,8 @@ public class RssException extends RuntimeException {
public RssException(String message) {
super(message);
}
+
+ public RssException(String message, Throwable e) {
+ super(message, e);
+ }
}
diff --git a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
index c348816..781b2d5 100644
--- a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java
@@ -20,10 +20,12 @@ package org.apache.uniffle.common;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class RssShuffleUtilsTest {
@@ -46,4 +48,27 @@ public class RssShuffleUtilsTest {
assertArrayEquals(data, buffer2);
}
+ @Test
+ public void testDestroyDirectByteBuffer() throws Exception {
+ int size = 10;
+ byte b = 1;
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
+ for (int i = 0; i < size; i++) {
+ byteBuffer.put(b);
+ }
+ byteBuffer.flip();
+ RssShuffleUtils.destroyDirectByteBuffer(byteBuffer);
+ // The memory may not be released fast enough.
+ Thread.sleep(200);
+ boolean same = true;
+ byte[] read = new byte[size];
+ byteBuffer.get(read);
+ for (byte br : read) {
+ if (b != br) {
+ same = false;
+ break;
+ }
+ }
+ assertTrue(!same);
+ }
}