You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/07/14 12:51:57 UTC

[incubator-celeborn] branch main updated: [CELEBORN-797] Decrease writeTime metric sampling frequency to improve perf

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a7bbbd05c [CELEBORN-797] Decrease writeTime metric sampling frequency to improve perf
a7bbbd05c is described below

commit a7bbbd05c4548c8b3d9d60c7ab1b12b152fa20c5
Author: zky.zhoukeyong <zk...@alibaba-inc.com>
AuthorDate: Fri Jul 14 20:51:50 2023 +0800

    [CELEBORN-797] Decrease writeTime metric sampling frequency to improve perf
    
    ### What changes were proposed in this pull request?
    1. Decrease writeTime metric sampling frequency to improve perf
    2. Set default value of ```celeborn.<module>.push.timeoutCheck.threads``` and ```celeborn.<module>.fetch.timeoutCheck.threads``` to 4
    
    ### Why are the changes needed?
    Following are test cases
    case 1: ```spark.sparkContext.parallelize(1 to 8000, 8000).flatMap( _ => (1 to 15000000).iterator.map(num => num)).repartition(8000).count``` // shuffle 1.1T data
    case 2: ```spark.sparkContext.parallelize(1 to 8000, 8000).flatMap( _ => (1 to 30000000).iterator.map(num => num)).repartition(8000).count``` // shuffle 2.2T data
    Following are e2e time of shuffle write stage
    ||Sort pusher before|Sort pusher after|Hash pusher before|Hash pusher after|
    |----|----|----|----|-----|
    |case1|4.4min|4.1min|4.4min|3.9min|
    |case2|9.1min|8.4min|9.7min|8.5min|
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Passes GA and manual test.
    
    Closes #1718 from waitinfuture/797.
    
    Authored-by: zky.zhoukeyong <zk...@alibaba-inc.com>
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
 .../spark/shuffle/celeborn/HashBasedShuffleWriter.java  | 10 ++--------
 .../spark/shuffle/celeborn/SortBasedShuffleWriter.java  | 10 ++--------
 .../spark/shuffle/celeborn/HashBasedShuffleWriter.java  | 17 ++++-------------
 .../spark/shuffle/celeborn/SortBasedShuffleWriter.java  | 10 ++--------
 .../scala/org/apache/celeborn/common/CelebornConf.scala |  4 ++--
 docs/configuration/network.md                           |  4 ++--
 6 files changed, 14 insertions(+), 41 deletions(-)

diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index c53eab9d4..e79524fb6 100644
--- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -188,7 +188,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
 
     SQLMetric dataSize =
         SparkUtils.getUnsafeRowSerializerDataSizeMetric((UnsafeRowSerializer) dep.serializer());
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<Integer, UnsafeRow> record = records.next();
       final int partitionId = record._1();
@@ -200,7 +199,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         dataSize.add(serializedRecordSize);
       }
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
         byte[] giantBuffer = new byte[serializedRecordSize];
         Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -223,16 +221,13 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
             rowSize);
         sendOffsets[partitionId] = offset + serializedRecordSize;
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void write0(scala.collection.Iterator iterator) throws IOException, InterruptedException {
     final scala.collection.Iterator<Product2<K, ?>> records = iterator;
 
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<K, ?> record = records.next();
       final K key = record._1();
@@ -245,7 +240,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       final int serializedRecordSize = serBuffer.size();
       assert (serializedRecordSize > 0);
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
@@ -254,10 +248,8 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         System.arraycopy(serBuffer.getBuf(), 0, buffer, offset, serializedRecordSize);
         sendOffsets[partitionId] = offset + serializedRecordSize;
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private byte[] getOrCreateBuffer(int partitionId) {
@@ -311,8 +303,10 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
 
   private void flushSendBuffer(int partitionId, byte[] buffer, int size)
       throws IOException, InterruptedException {
+    long start = System.nanoTime();
     logger.debug("Flush buffer for partition {}, size {}.", partitionId, size);
     dataPusher.addTask(partitionId, buffer, size);
+    writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
   private void close() throws IOException, InterruptedException {
diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 1ea01ff53..03f6f8626 100644
--- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -195,7 +195,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
     SQLMetric dataSize =
         SparkUtils.getUnsafeRowSerializerDataSizeMetric((UnsafeRowSerializer) dep.serializer());
 
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<Integer, UnsafeRow> record = records.next();
       final int partitionId = record._1();
@@ -207,7 +206,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         dataSize.add(serializedRecordSize);
       }
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         byte[] giantBuffer = new byte[serializedRecordSize];
         Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -232,13 +230,12 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
           }
         }
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushAndSwitch() throws IOException {
+    long start = System.nanoTime();
     if (pipelined) {
       currentPusher.triggerPush();
       currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
@@ -246,6 +243,7 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
     } else {
       currentPusher.pushData();
     }
+    writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
   private void updatePeakMemoryUsed() {
@@ -279,7 +277,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
   private void write0(scala.collection.Iterator iterator) throws IOException {
     final scala.collection.Iterator<Product2<K, ?>> records = iterator;
 
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<K, ?> record = records.next();
       final K key = record._1();
@@ -292,7 +289,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       final int serializedRecordSize = serBuffer.size();
       assert (serializedRecordSize > 0);
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
@@ -317,10 +313,8 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
           }
         }
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 07ad27a1b..349629edf 100644
--- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.client.write.DataPusher;
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.util.Utils;
 
 @Private
 public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -223,7 +224,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
     final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;
 
     SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer());
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<Integer, UnsafeRow> record = records.next();
       final int partitionId = record._1();
@@ -246,7 +246,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         celebornBatchBuilders[partitionId] = columnBuilders;
       }
 
-      long insertAndPushStartTime = System.nanoTime();
       celebornBatchBuilders[partitionId].writeRow(row);
       if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
         byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
@@ -256,10 +255,8 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         }
         celebornBatchBuilders[partitionId].newBuilders();
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void fastWrite0(scala.collection.Iterator iterator)
@@ -267,7 +264,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
     final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;
 
     SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer());
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<Integer, UnsafeRow> record = records.next();
       final int partitionId = record._1();
@@ -280,7 +276,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         dataSize.add(rowSize);
       }
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
         byte[] giantBuffer = new byte[serializedRecordSize];
         Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -303,16 +298,13 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
             rowSize);
         sendOffsets[partitionId] = offset + serializedRecordSize;
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void write0(scala.collection.Iterator iterator) throws IOException, InterruptedException {
     final scala.collection.Iterator<Product2<K, ?>> records = iterator;
 
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<K, ?> record = records.next();
       final K key = record._1();
@@ -325,7 +317,6 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       final int serializedRecordSize = serBuffer.size();
       assert (serializedRecordSize > 0);
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > PUSH_BUFFER_MAX_SIZE) {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
@@ -334,10 +325,8 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         System.arraycopy(serBuffer.getBuf(), 0, buffer, offset, serializedRecordSize);
         sendOffsets[partitionId] = offset + serializedRecordSize;
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private byte[] getOrCreateBuffer(int partitionId) {
@@ -391,8 +380,10 @@ public class HashBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
 
   private void flushSendBuffer(int partitionId, byte[] buffer, int size)
       throws IOException, InterruptedException {
-    logger.debug("Flush buffer, size {}.", size);
+    long start = System.nanoTime();
+    logger.debug("Flush buffer, size {}.", Utils.bytesToString(size));
     dataPusher.addTask(partitionId, buffer, size);
+    writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
   private void closeColumnarWrite() throws IOException {
diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 017b006d6..a6d4ff5e4 100644
--- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -242,7 +242,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
     final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;
 
     SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) dep.serializer());
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<Integer, UnsafeRow> record = records.next();
       final int partitionId = record._1();
@@ -255,7 +254,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         dataSize.add(serializedRecordSize);
       }
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         byte[] giantBuffer = new byte[serializedRecordSize];
         Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(rowSize));
@@ -280,13 +278,12 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
           }
         }
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushAndSwitch() throws IOException {
+    long start = System.nanoTime();
     if (pipelined) {
       currentPusher.triggerPush();
       currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
@@ -294,12 +291,12 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
     } else {
       currentPusher.pushData();
     }
+    writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
   private void write0(scala.collection.Iterator iterator) throws IOException {
     final scala.collection.Iterator<Product2<K, ?>> records = iterator;
 
-    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<K, ?> record = records.next();
       final K key = record._1();
@@ -312,7 +309,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       final int serializedRecordSize = serBuffer.size();
       assert (serializedRecordSize > 0);
 
-      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
@@ -337,10 +333,8 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
           }
         }
       }
-      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
-    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 17fe8e8d7..d36fc616e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1447,7 +1447,7 @@ object CelebornConf extends Logging {
         s"it works for worker replicate data to peer worker and should be configured on worker side.")
       .version("0.3.0")
       .intConf
-      .createWithDefault(16)
+      .createWithDefault(4)
 
   val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
     buildConf("celeborn.<module>.fetch.timeoutCheck.interval")
@@ -1467,7 +1467,7 @@ object CelebornConf extends Logging {
         s"since it works for shuffle client fetch data and should be configured on client side.")
       .version("0.3.0")
       .intConf
-      .createWithDefault(16)
+      .createWithDefault(4)
 
   val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
     buildConf("celeborn.<module>.heartbeat.interval")
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 6cde65c5b..cc66b985b 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -20,7 +20,7 @@ license: |
 | Key | Default | Description | Since |
 | --- | ------- | ----------- | ----- |
 | celeborn.&lt;module&gt;.fetch.timeoutCheck.interval | 5s | Interval for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 | 
-| celeborn.&lt;module&gt;.fetch.timeoutCheck.threads | 16 | Threads num for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 | 
+| celeborn.&lt;module&gt;.fetch.timeoutCheck.threads | 4 | Threads num for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 | 
 | celeborn.&lt;module&gt;.heartbeat.interval | 60s | The heartbeat interval between worker and client. If setting <module> to `data`, it works for shuffle client push and fetch data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 | 
 | celeborn.&lt;module&gt;.io.backLog | 0 | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. |  | 
 | celeborn.&lt;module&gt;.io.clientThreads | 0 | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. |  | 
@@ -37,7 +37,7 @@ license: |
 | celeborn.&lt;module&gt;.io.sendBuffer | 0b | Send buffer size (SO_SNDBUF). | 0.2.0 | 
 | celeborn.&lt;module&gt;.io.serverThreads | 0 | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. |  | 
 | celeborn.&lt;module&gt;.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 | 
-| celeborn.&lt;module&gt;.push.timeoutCheck.threads | 16 | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 | 
+| celeborn.&lt;module&gt;.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 | 
 | celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. | 0.3.0 | 
 | celeborn.network.connect.timeout | 10s | Default socket connect timeout. | 0.2.0 | 
 | celeborn.network.memory.allocator.numArenas | &lt;undefined&gt; | Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 |