You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/07/14 12:51:57 UTC
[incubator-celeborn] branch main updated: [CELEBORN-797] Decrease writeTime metric sampling frequency to improve perf
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a7bbbd05c [CELEBORN-797] Decrease writeTime metric sampling frequency to improve perf
a7bbbd05c is described below
commit a7bbbd05c4548c8b3d9d60c7ab1b12b152fa20c5
Author: zky.zhoukeyong <zk...@alibaba-inc.com>
AuthorDate: Fri Jul 14 20:51:50 2023 +0800
[CELEBORN-797] Decrease writeTime metric sampling frequency to improve perf
### What changes were proposed in this pull request?
1. Decrease writeTime metric sampling frequency to improve perf
2. Set default value of ```celeborn.<module>.push.timeoutCheck.threads``` and ```celeborn.<module>.fetch.timeoutCheck.threads``` to 4
### Why are the changes needed?
Following are test cases
case 1: ```spark.sparkContext.parallelize(1 to 8000, 8000).flatMap( _ => (1 to 15000000).iterator.map(num => num)).repartition(8000).count``` // shuffle 1.1T data
case 2: ```spark.sparkContext.parallelize(1 to 8000, 8000).flatMap( _ => (1 to 30000000).iterator.map(num => num)).repartition(8000).count``` // shuffle 2.2T data
Following are e2e time of shuffle write stage
||Sort pusher before|Sort pusher after|Hash pusher before|Hash pusher after|
|----|----|----|----|-----|
|case1|4.4min|4.1min|4.4min|3.9min|
|case2|9.1min|8.4min|9.7min|8.5min|
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA and manual test.
Closes #1718 from waitinfuture/797.
Authored-by: zky.zhoukeyong <zk...@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
.../spark/shuffle/celeborn/HashBasedShuffleWriter.java | 10 ++--------
.../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 10 ++--------
.../spark/shuffle/celeborn/HashBasedShuffleWriter.java | 17 ++++-------------
.../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 10 ++--------
.../scala/org/apache/celeborn/common/CelebornConf.scala | 4 ++--
docs/configuration/network.md | 4 ++--
6 files changed, 14 insertions(+), 41 deletions(-)
diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index c53eab9d4..e79524fb6 100644
--- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -188,7 +188,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
SQLMetric dataSize =
SparkUtils.getUnsafeRowSerializerDataSizeMetric((UnsafeRowSerializer) dep.serializer());
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
@@ -200,7 +199,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
dataSize.add(serializedRecordSize);
}
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
byte[] giantBuffer = new byte[serializedRecordSize];
Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -223,16 +221,13 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
rowSize);
sendOffsets[partitionId] = offset + serializedRecordSize;
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void write0(scala.collection.Iterator iterator) throws IOException, InterruptedException {
final scala.collection.Iterator<Product2<K, ?>> records = iterator;
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<K, ?> record = records.next();
final K key = record._1();
@@ -245,7 +240,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
@@ -254,10 +248,8 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
System.arraycopy(serBuffer.getBuf(), 0, buffer, offset, serializedRecordSize);
sendOffsets[partitionId] = offset + serializedRecordSize;
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private byte[] getOrCreateBuffer(int partitionId) {
@@ -311,8 +303,10 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private void flushSendBuffer(int partitionId, byte[] buffer, int size)
throws IOException, InterruptedException {
+ long start = System.nanoTime();
logger.debug("Flush buffer for partition {}, size {}.", partitionId, size);
dataPusher.addTask(partitionId, buffer, size);
+ writeMetrics.incWriteTime(System.nanoTime() - start);
}
private void close() throws IOException, InterruptedException {
diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 1ea01ff53..03f6f8626 100644
--- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -195,7 +195,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
SQLMetric dataSize =
SparkUtils.getUnsafeRowSerializerDataSizeMetric((UnsafeRowSerializer) dep.serializer());
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
@@ -207,7 +206,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
dataSize.add(serializedRecordSize);
}
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
byte[] giantBuffer = new byte[serializedRecordSize];
Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -232,13 +230,12 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
}
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushAndSwitch() throws IOException {
+ long start = System.nanoTime();
if (pipelined) {
currentPusher.triggerPush();
currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
@@ -246,6 +243,7 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
} else {
currentPusher.pushData();
}
+ writeMetrics.incWriteTime(System.nanoTime() - start);
}
private void updatePeakMemoryUsed() {
@@ -279,7 +277,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private void write0(scala.collection.Iterator iterator) throws IOException {
final scala.collection.Iterator<Product2<K, ?>> records = iterator;
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<K, ?> record = records.next();
final K key = record._1();
@@ -292,7 +289,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
@@ -317,10 +313,8 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
}
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 07ad27a1b..349629edf 100644
--- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.client.write.DataPusher;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.util.Utils;
@Private
public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -223,7 +224,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer());
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
@@ -246,7 +246,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
celebornBatchBuilders[partitionId] = columnBuilders;
}
- long insertAndPushStartTime = System.nanoTime();
celebornBatchBuilders[partitionId].writeRow(row);
if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
@@ -256,10 +255,8 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
celebornBatchBuilders[partitionId].newBuilders();
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void fastWrite0(scala.collection.Iterator iterator)
@@ -267,7 +264,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer());
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
@@ -280,7 +276,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
dataSize.add(rowSize);
}
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
byte[] giantBuffer = new byte[serializedRecordSize];
Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -303,16 +298,13 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
rowSize);
sendOffsets[partitionId] = offset + serializedRecordSize;
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void write0(scala.collection.Iterator iterator) throws IOException, InterruptedException {
final scala.collection.Iterator<Product2<K, ?>> records = iterator;
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<K, ?> record = records.next();
final K key = record._1();
@@ -325,7 +317,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
@@ -334,10 +325,8 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
System.arraycopy(serBuffer.getBuf(), 0, buffer, offset, serializedRecordSize);
sendOffsets[partitionId] = offset + serializedRecordSize;
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private byte[] getOrCreateBuffer(int partitionId) {
@@ -391,8 +380,10 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private void flushSendBuffer(int partitionId, byte[] buffer, int size)
throws IOException, InterruptedException {
- logger.debug("Flush buffer, size {}.", size);
+ long start = System.nanoTime();
+ logger.debug("Flush buffer, size {}.", Utils.bytesToString(size));
dataPusher.addTask(partitionId, buffer, size);
+ writeMetrics.incWriteTime(System.nanoTime() - start);
}
private void closeColumnarWrite() throws IOException {
diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 017b006d6..a6d4ff5e4 100644
--- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -242,7 +242,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer());
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
@@ -255,7 +254,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
dataSize.add(serializedRecordSize);
}
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
byte[] giantBuffer = new byte[serializedRecordSize];
Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -280,13 +278,12 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
}
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushAndSwitch() throws IOException {
+ long start = System.nanoTime();
if (pipelined) {
currentPusher.triggerPush();
currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
@@ -294,12 +291,12 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
} else {
currentPusher.pushData();
}
+ writeMetrics.incWriteTime(System.nanoTime() - start);
}
private void write0(scala.collection.Iterator iterator) throws IOException {
final scala.collection.Iterator<Product2<K, ?>> records = iterator;
- long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<K, ?> record = records.next();
final K key = record._1();
@@ -312,7 +309,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
- long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
@@ -337,10 +333,8 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
}
}
- shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
- writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 17fe8e8d7..d36fc616e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1447,7 +1447,7 @@ object CelebornConf extends Logging {
s"it works for worker replicate data to peer worker and should be configured on worker side.")
.version("0.3.0")
.intConf
- .createWithDefault(16)
+ .createWithDefault(4)
val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.fetch.timeoutCheck.interval")
@@ -1467,7 +1467,7 @@ object CelebornConf extends Logging {
s"since it works for shuffle client fetch data and should be configured on client side.")
.version("0.3.0")
.intConf
- .createWithDefault(16)
+ .createWithDefault(4)
val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.heartbeat.interval")
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 6cde65c5b..cc66b985b 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -20,7 +20,7 @@ license: |
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.<module>.fetch.timeoutCheck.interval | 5s | Interval for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 |
-| celeborn.<module>.fetch.timeoutCheck.threads | 16 | Threads num for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 |
+| celeborn.<module>.fetch.timeoutCheck.threads | 4 | Threads num for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 |
| celeborn.<module>.heartbeat.interval | 60s | The heartbeat interval between worker and client. If setting <module> to `data`, it works for shuffle client push and fetch data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 |
| celeborn.<module>.io.backLog | 0 | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. | |
| celeborn.<module>.io.clientThreads | 0 | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. | |
@@ -37,7 +37,7 @@ license: |
| celeborn.<module>.io.sendBuffer | 0b | Send buffer size (SO_SNDBUF). | 0.2.0 |
| celeborn.<module>.io.serverThreads | 0 | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. | |
| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 |
-| celeborn.<module>.push.timeoutCheck.threads | 16 | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 |
+| celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 |
| celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. | 0.3.0 |
| celeborn.network.connect.timeout | 10s | Default socket connect timeout. | 0.2.0 |
| celeborn.network.memory.allocator.numArenas | <undefined> | Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 |