You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/17 08:55:10 UTC

[incubator-celeborn] branch main updated: [CELEBORN-435][FLINK] Code refine for RemoteShuffleInputGate (#1358)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b18042d08 [CELEBORN-435][FLINK] Code refine for RemoteShuffleInputGate (#1358)
b18042d08 is described below

commit b18042d08e294fa1d7858af18362849e5b1f6a97
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Fri Mar 17 16:55:05 2023 +0800

    [CELEBORN-435][FLINK] Code refine for RemoteShuffleInputGate (#1358)
---
 .../plugin/flink/RemoteShuffleInputGate.java       | 55 +++++++++++-----------
 1 file changed, 28 insertions(+), 27 deletions(-)

diff --git a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 1192125cf..7df4a4a28 100644
--- a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -98,13 +98,13 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
   private final List<RemoteBufferStreamReader> bufferReaders = new ArrayList<>();
   private final List<InputChannelInfo> channelsInfo;
   /** Map from channel index to shuffle client index. */
-  private final int[] clientIndexMap;
+  private final int[] channelIndexToReaderIndex;
 
   /** Map from shuffle client index to channel index. */
-  private final int[] channelIndexMap;
+  private final int[] readerIndexToChannelIndex;
 
   /** The number of subpartitions that has not consumed per channel. */
-  private final int[] numSubPartitionsHasNotConsumed;
+  private final int[] numSubPartitionsNotConsumed;
 
   /** The overall number of subpartitions that has not been consumed. */
   private long numUnconsumedSubpartitions;
@@ -147,9 +147,9 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
     this.bufferPoolFactory = bufferPoolFactory;
 
     int numChannels = gateDescriptor.getShuffleDescriptors().length;
-    this.clientIndexMap = new int[numChannels];
-    this.channelIndexMap = new int[numChannels];
-    this.numSubPartitionsHasNotConsumed = new int[numChannels];
+    this.channelIndexToReaderIndex = new int[numChannels];
+    this.readerIndexToChannelIndex = new int[numChannels];
+    this.numSubPartitionsNotConsumed = new int[numChannels];
     this.bufferDecompressor = bufferDecompressor;
 
     RemoteShuffleDescriptor remoteShuffleDescriptor =
@@ -161,12 +161,11 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
             celebornConf,
             new UserIdentifier("default", "default"));
 
-    this.numUnconsumedSubpartitions = initShuffleReadClients();
-    this.pendingEndOfDataEvents = numUnconsumedSubpartitions;
-    this.channelsInfo = createChannelInfos();
+    initShuffleReadClients();
+    channelsInfo = createChannelInfos();
   }
 
-  private long initShuffleReadClients() {
+  private void initShuffleReadClients() {
     int startSubIdx = gateDescriptor.getConsumedSubpartitionIndex();
     int endSubIdx = gateDescriptor.getConsumedSubpartitionIndex();
     int numSubpartitionsPerChannel = endSubIdx - startSubIdx + 1;
@@ -178,7 +177,7 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
             .mapToObj(i -> Pair.of(i, gateDescriptor.getShuffleDescriptors()[i]))
             .collect(Collectors.toList());
 
-    int clientIndex = 0;
+    int readerIndex = 0;
     for (Pair<Integer, ShuffleDescriptor> descriptor : descriptors) {
       RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) descriptor.getRight();
       ShuffleResourceDescriptor shuffleDescriptor =
@@ -199,13 +198,15 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
               getFailureListener(remoteDescriptor.getResultPartitionID()));
 
       bufferReaders.add(reader);
-      numSubPartitionsHasNotConsumed[descriptor.getLeft()] = numSubpartitionsPerChannel;
+      numSubPartitionsNotConsumed[descriptor.getLeft()] = numSubpartitionsPerChannel;
       numUnconsumedSubpartitions += numSubpartitionsPerChannel;
-      clientIndexMap[descriptor.getLeft()] = clientIndex;
-      channelIndexMap[clientIndex] = descriptor.getLeft();
-      ++clientIndex;
+      channelIndexToReaderIndex[descriptor.getLeft()] = readerIndex;
+      readerIndexToChannelIndex[readerIndex] = descriptor.getLeft();
+      ++readerIndex;
     }
-    return numUnconsumedSubpartitions;
+
+    this.numUnconsumedSubpartitions = numUnconsumedSubpartitions;
+    this.pendingEndOfDataEvents = numUnconsumedSubpartitions;
   }
 
   /** Setup gate and build network connections. */
@@ -303,9 +304,9 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
     Throwable closeException = null;
     // Do not check closed flag, thus to allow calling this method from both task thread and
     // cancel thread.
-    for (RemoteBufferStreamReader shuffleReadClient : bufferReaders) {
+    for (RemoteBufferStreamReader reader : bufferReaders) {
       try {
-        shuffleReadClient.close();
+        reader.close();
       } catch (Throwable throwable) {
         closeException = closeException == null ? throwable : closeException;
         LOG.error("Failed to close shuffle read client.", throwable);
@@ -364,7 +365,7 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
 
   /** Try to open more readers to {@link #numConcurrentReading}. */
   private void tryOpenSomeChannels() throws IOException {
-    List<RemoteBufferStreamReader> clientsToOpen = new ArrayList<>();
+    List<RemoteBufferStreamReader> readersToOpen = new ArrayList<>();
 
     synchronized (lock) {
       if (closed) {
@@ -379,25 +380,25 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
             "Trying reader: {}, isOpened={}, numSubPartitionsHasNotConsumed={}.",
             bufferStreamReader,
             bufferStreamReader.isOpened(),
-            numSubPartitionsHasNotConsumed[channelIndexMap[i]]);
+            numSubPartitionsNotConsumed[readerIndexToChannelIndex[i]]);
         if (numOnGoing >= numConcurrentReading) {
           break;
         }
 
         if (bufferStreamReader.isOpened()
-            && numSubPartitionsHasNotConsumed[channelIndexMap[i]] > 0) {
+            && numSubPartitionsNotConsumed[readerIndexToChannelIndex[i]] > 0) {
           numOnGoing++;
           continue;
         }
 
         if (!bufferStreamReader.isOpened()) {
-          clientsToOpen.add(bufferStreamReader);
+          readersToOpen.add(bufferStreamReader);
           numOnGoing++;
         }
       }
     }
 
-    for (RemoteBufferStreamReader reader : clientsToOpen) {
+    for (RemoteBufferStreamReader reader : readersToOpen) {
       reader.open(0);
     }
   }
@@ -531,16 +532,16 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
     }
     if (event.getClass() == EndOfPartitionEvent.class) {
       checkState(
-          numSubPartitionsHasNotConsumed[channelInfo.getInputChannelIdx()] > 0,
+          numSubPartitionsNotConsumed[channelInfo.getInputChannelIdx()] > 0,
           "BUG -- EndOfPartitionEvent received repeatedly.");
-      numSubPartitionsHasNotConsumed[channelInfo.getInputChannelIdx()]--;
+      numSubPartitionsNotConsumed[channelInfo.getInputChannelIdx()]--;
       numUnconsumedSubpartitions--;
       // not the real end.
-      if (numSubPartitionsHasNotConsumed[channelInfo.getInputChannelIdx()] != 0) {
+      if (numSubPartitionsNotConsumed[channelInfo.getInputChannelIdx()] != 0) {
         return Optional.empty();
       } else {
         // the real end.
-        bufferReaders.get(clientIndexMap[channelInfo.getInputChannelIdx()]).close();
+        bufferReaders.get(channelIndexToReaderIndex[channelInfo.getInputChannelIdx()]).close();
         //         tryOpenSomeChannels();
         if (allReadersEOF()) {
           availabilityHelper.getUnavailableToResetAvailable().complete(null);