You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/17 08:55:10 UTC
[incubator-celeborn] branch main updated: [CELEBORN-435][FLINK] Code refine for RemoteShuffleInputGate (#1358)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b18042d08 [CELEBORN-435][FLINK] Code refine for RemoteShuffleInputGate (#1358)
b18042d08 is described below
commit b18042d08e294fa1d7858af18362849e5b1f6a97
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Fri Mar 17 16:55:05 2023 +0800
[CELEBORN-435][FLINK] Code refine for RemoteShuffleInputGate (#1358)
---
.../plugin/flink/RemoteShuffleInputGate.java | 55 +++++++++++-----------
1 file changed, 28 insertions(+), 27 deletions(-)
diff --git a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 1192125cf..7df4a4a28 100644
--- a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -98,13 +98,13 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
private final List<RemoteBufferStreamReader> bufferReaders = new ArrayList<>();
private final List<InputChannelInfo> channelsInfo;
/** Map from channel index to shuffle client index. */
- private final int[] clientIndexMap;
+ private final int[] channelIndexToReaderIndex;
/** Map from shuffle client index to channel index. */
- private final int[] channelIndexMap;
+ private final int[] readerIndexToChannelIndex;
/** The number of subpartitions that has not consumed per channel. */
- private final int[] numSubPartitionsHasNotConsumed;
+ private final int[] numSubPartitionsNotConsumed;
/** The overall number of subpartitions that has not been consumed. */
private long numUnconsumedSubpartitions;
@@ -147,9 +147,9 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
this.bufferPoolFactory = bufferPoolFactory;
int numChannels = gateDescriptor.getShuffleDescriptors().length;
- this.clientIndexMap = new int[numChannels];
- this.channelIndexMap = new int[numChannels];
- this.numSubPartitionsHasNotConsumed = new int[numChannels];
+ this.channelIndexToReaderIndex = new int[numChannels];
+ this.readerIndexToChannelIndex = new int[numChannels];
+ this.numSubPartitionsNotConsumed = new int[numChannels];
this.bufferDecompressor = bufferDecompressor;
RemoteShuffleDescriptor remoteShuffleDescriptor =
@@ -161,12 +161,11 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
celebornConf,
new UserIdentifier("default", "default"));
- this.numUnconsumedSubpartitions = initShuffleReadClients();
- this.pendingEndOfDataEvents = numUnconsumedSubpartitions;
- this.channelsInfo = createChannelInfos();
+ initShuffleReadClients();
+ channelsInfo = createChannelInfos();
}
- private long initShuffleReadClients() {
+ private void initShuffleReadClients() {
int startSubIdx = gateDescriptor.getConsumedSubpartitionIndex();
int endSubIdx = gateDescriptor.getConsumedSubpartitionIndex();
int numSubpartitionsPerChannel = endSubIdx - startSubIdx + 1;
@@ -178,7 +177,7 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
.mapToObj(i -> Pair.of(i, gateDescriptor.getShuffleDescriptors()[i]))
.collect(Collectors.toList());
- int clientIndex = 0;
+ int readerIndex = 0;
for (Pair<Integer, ShuffleDescriptor> descriptor : descriptors) {
RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) descriptor.getRight();
ShuffleResourceDescriptor shuffleDescriptor =
@@ -199,13 +198,15 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
getFailureListener(remoteDescriptor.getResultPartitionID()));
bufferReaders.add(reader);
- numSubPartitionsHasNotConsumed[descriptor.getLeft()] = numSubpartitionsPerChannel;
+ numSubPartitionsNotConsumed[descriptor.getLeft()] = numSubpartitionsPerChannel;
numUnconsumedSubpartitions += numSubpartitionsPerChannel;
- clientIndexMap[descriptor.getLeft()] = clientIndex;
- channelIndexMap[clientIndex] = descriptor.getLeft();
- ++clientIndex;
+ channelIndexToReaderIndex[descriptor.getLeft()] = readerIndex;
+ readerIndexToChannelIndex[readerIndex] = descriptor.getLeft();
+ ++readerIndex;
}
- return numUnconsumedSubpartitions;
+
+ this.numUnconsumedSubpartitions = numUnconsumedSubpartitions;
+ this.pendingEndOfDataEvents = numUnconsumedSubpartitions;
}
/** Setup gate and build network connections. */
@@ -303,9 +304,9 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
Throwable closeException = null;
// Do not check closed flag, thus to allow calling this method from both task thread and
// cancel thread.
- for (RemoteBufferStreamReader shuffleReadClient : bufferReaders) {
+ for (RemoteBufferStreamReader reader : bufferReaders) {
try {
- shuffleReadClient.close();
+ reader.close();
} catch (Throwable throwable) {
closeException = closeException == null ? throwable : closeException;
LOG.error("Failed to close shuffle read client.", throwable);
@@ -364,7 +365,7 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
/** Try to open more readers to {@link #numConcurrentReading}. */
private void tryOpenSomeChannels() throws IOException {
- List<RemoteBufferStreamReader> clientsToOpen = new ArrayList<>();
+ List<RemoteBufferStreamReader> readersToOpen = new ArrayList<>();
synchronized (lock) {
if (closed) {
@@ -379,25 +380,25 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
"Trying reader: {}, isOpened={}, numSubPartitionsHasNotConsumed={}.",
bufferStreamReader,
bufferStreamReader.isOpened(),
- numSubPartitionsHasNotConsumed[channelIndexMap[i]]);
+ numSubPartitionsNotConsumed[readerIndexToChannelIndex[i]]);
if (numOnGoing >= numConcurrentReading) {
break;
}
if (bufferStreamReader.isOpened()
- && numSubPartitionsHasNotConsumed[channelIndexMap[i]] > 0) {
+ && numSubPartitionsNotConsumed[readerIndexToChannelIndex[i]] > 0) {
numOnGoing++;
continue;
}
if (!bufferStreamReader.isOpened()) {
- clientsToOpen.add(bufferStreamReader);
+ readersToOpen.add(bufferStreamReader);
numOnGoing++;
}
}
}
- for (RemoteBufferStreamReader reader : clientsToOpen) {
+ for (RemoteBufferStreamReader reader : readersToOpen) {
reader.open(0);
}
}
@@ -531,16 +532,16 @@ public class RemoteShuffleInputGate extends IndexedInputGate {
}
if (event.getClass() == EndOfPartitionEvent.class) {
checkState(
- numSubPartitionsHasNotConsumed[channelInfo.getInputChannelIdx()] > 0,
+ numSubPartitionsNotConsumed[channelInfo.getInputChannelIdx()] > 0,
"BUG -- EndOfPartitionEvent received repeatedly.");
- numSubPartitionsHasNotConsumed[channelInfo.getInputChannelIdx()]--;
+ numSubPartitionsNotConsumed[channelInfo.getInputChannelIdx()]--;
numUnconsumedSubpartitions--;
// not the real end.
- if (numSubPartitionsHasNotConsumed[channelInfo.getInputChannelIdx()] != 0) {
+ if (numSubPartitionsNotConsumed[channelInfo.getInputChannelIdx()] != 0) {
return Optional.empty();
} else {
// the real end.
- bufferReaders.get(clientIndexMap[channelInfo.getInputChannelIdx()]).close();
+ bufferReaders.get(channelIndexToReaderIndex[channelInfo.getInputChannelIdx()]).close();
// tryOpenSomeChannels();
if (allReadersEOF()) {
availabilityHelper.getUnavailableToResetAvailable().complete(null);