You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "RexXiong (via GitHub)" <gi...@apache.org> on 2023/02/17 09:21:58 UTC

[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1244: [CELEBORN-235] Implement flink plugin.

RexXiong commented on code in PR #1244:
URL: https://github.com/apache/incubator-celeborn/pull/1244#discussion_r1109495763


##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/RssBufferStream.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.readclient;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.protocol.*;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory;
+
+public class RssBufferStream {
+
+  private static Logger logger = LoggerFactory.getLogger(RssBufferStream.class);
+  private CelebornConf conf;
+  private FlinkTransportClientFactory clientFactory;
+  private String shuffleKey;
+  private PartitionLocation[] locations;
+  private int subIndexStart;
+  private int subIndexEnd;
+  private TransportClient client;
+  private int currentLocationIndex = 0;
+  private long streamId = 0;
+
+  public RssBufferStream() {}
+
+  public RssBufferStream(
+      CelebornConf conf,
+      FlinkTransportClientFactory dataClientFactory,
+      String shuffleKey,
+      PartitionLocation[] locations,
+      int subIndexStart,
+      int subIndexEnd) {
+    this.conf = conf;
+    this.clientFactory = dataClientFactory;
+    this.shuffleKey = shuffleKey;
+    this.locations = locations;
+    this.subIndexStart = subIndexStart;
+    this.subIndexEnd = subIndexEnd;
+  }
+
+  public void open(
+      Supplier<ByteBuf> supplier,
+      int initialCredit,
+      FlinkShuffleClientImpl mapShuffleClient,
+      Consumer<RequestMessage> messageConsumer)
+      throws IOException, InterruptedException {
+    if (locations.length >= 1) {
+      this.client =
+          clientFactory.createClient(
+              locations[currentLocationIndex].getHost(),
+              locations[currentLocationIndex].getFetchPort(),
+              -1,
+              supplier);
+    }
+    OpenStreamWithCredit openBufferStream =

Review Comment:
   how about locations is empty?



##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.network.protocol.BacklogAnnouncement;
+import org.apache.celeborn.common.network.protocol.ReadAddCredit;
+import org.apache.celeborn.common.network.protocol.RequestMessage;
+import org.apache.celeborn.plugin.flink.buffer.CreditListener;
+import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
+import org.apache.celeborn.plugin.flink.protocol.ReadData;
+import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.readclient.RssBufferStream;
+
+public class RemoteBufferStreamReader extends CreditListener {
+  private static Logger logger = LoggerFactory.getLogger(RemoteBufferStreamReader.class);
+  private TransferBufferPool bufferPool;
+  private FlinkShuffleClientImpl client;
+  private String applicationId;
+  private int shuffleId;
+  private int partitionId;
+  private int subPartitionIndexStart;
+  private int subPartitionIndexEnd;
+  private boolean isOpen;
+  private Consumer<ByteBuf> dataListener;
+  private Consumer<Throwable> failureListener;
+  private RssBufferStream bufferStream;
+  private boolean closed = false;
+  private Consumer<RequestMessage> messageConsumer;
+
+  public RemoteBufferStreamReader(
+      FlinkShuffleClientImpl client,
+      ShuffleResourceDescriptor shuffleDescriptor,
+      String applicationId,
+      int startSubIdx,
+      int endSubIdx,
+      TransferBufferPool bufferPool,
+      Consumer<ByteBuf> dataListener,
+      Consumer<Throwable> failureListener) {
+    this.client = client;
+    this.applicationId = applicationId;
+    this.shuffleId = shuffleDescriptor.getShuffleId();
+    this.partitionId = shuffleDescriptor.getPartitionId();
+    this.bufferPool = bufferPool;
+    this.subPartitionIndexStart = startSubIdx;
+    this.subPartitionIndexEnd = endSubIdx;
+    this.dataListener = dataListener;
+    this.failureListener = failureListener;
+    this.messageConsumer =
+        requestMessage -> {
+          if (requestMessage instanceof ReadData) {
+            dataReceived((ReadData) requestMessage);
+          } else if (requestMessage instanceof BacklogAnnouncement) {
+            backlogReceived(((BacklogAnnouncement) requestMessage).getBacklog());
+          }
+        };
+  }
+
+  public void open(int initialCredit) throws IOException {
+    try {
+      this.bufferStream =
+          client.readBufferedPartition(
+              applicationId, shuffleId, partitionId, subPartitionIndexStart, subPartitionIndexEnd);
+      bufferStream.open(
+          RemoteBufferStreamReader.this::requestBuffer, initialCredit, client, messageConsumer);
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Failed to openStream.", e);
+    }
+    isOpen = true;
+  }
+
+  public void close() {
+    isOpen = false;
+    client.getReadClientHandler().removeHandler(this.bufferStream.getStreamId());
+  }
+
+  public boolean isOpened() {
+    return isOpen;
+  }
+
+  public void notifyAvailableCredits(int numCredits) {
+    ReadAddCredit addCredit = new ReadAddCredit(this.bufferStream.getStreamId(), numCredits);
+    bufferStream.addCredit(addCredit);
+  }
+
+  public ByteBuf requestBuffer() {
+    return bufferPool.requestBuffer();
+  }
+
+  public void backlogReceived(int backlog) {
+    bufferPool.reserveBuffers(this, backlog);
+  }
+
+  public void dataReceived(ReadData readData) {
+    logger.info(

Review Comment:
   change info -> debug



##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/RssBufferStream.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.readclient;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.protocol.*;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory;
+
+public class RssBufferStream {
+
+  private static Logger logger = LoggerFactory.getLogger(RssBufferStream.class);
+  private CelebornConf conf;
+  private FlinkTransportClientFactory clientFactory;
+  private String shuffleKey;
+  private PartitionLocation[] locations;
+  private int subIndexStart;
+  private int subIndexEnd;
+  private TransportClient client;
+  private int currentLocationIndex = 0;
+  private long streamId = 0;
+
+  public RssBufferStream() {}
+
+  public RssBufferStream(
+      CelebornConf conf,
+      FlinkTransportClientFactory dataClientFactory,
+      String shuffleKey,
+      PartitionLocation[] locations,
+      int subIndexStart,
+      int subIndexEnd) {
+    this.conf = conf;
+    this.clientFactory = dataClientFactory;
+    this.shuffleKey = shuffleKey;
+    this.locations = locations;
+    this.subIndexStart = subIndexStart;
+    this.subIndexEnd = subIndexEnd;
+  }
+
+  public void open(
+      Supplier<ByteBuf> supplier,
+      int initialCredit,
+      FlinkShuffleClientImpl mapShuffleClient,
+      Consumer<RequestMessage> messageConsumer)
+      throws IOException, InterruptedException {
+    if (locations.length >= 1) {
+      this.client =
+          clientFactory.createClient(
+              locations[currentLocationIndex].getHost(),
+              locations[currentLocationIndex].getFetchPort(),
+              -1,
+              supplier);
+    }
+    OpenStreamWithCredit openBufferStream =
+        new OpenStreamWithCredit(
+            shuffleKey,
+            locations[currentLocationIndex].getFileName(),
+            subIndexStart,
+            subIndexEnd,
+            initialCredit);
+    long timeoutMs = conf.fetchTimeoutMs();
+    CountDownLatch latch = new CountDownLatch(1);
+    client.sendRpc(
+        openBufferStream.toByteBuffer(),
+        new RpcResponseCallback() {
+
+          @Override
+          public void onSuccess(ByteBuffer response) {
+            StreamHandle streamHandle = (StreamHandle) Message.decode(response);
+            RssBufferStream.this.streamId = streamHandle.streamId;
+            mapShuffleClient
+                .getReadClientHandler()
+                .registerHandler(streamId, messageConsumer, client);
+            latch.countDown();
+          }
+
+          @Override
+          public void onFailure(Throwable e) {
+            throw new RuntimeException("OpenStream failed.", e);
+          }
+        });
+    latch.await(timeoutMs, TimeUnit.MILLISECONDS);

Review Comment:
   check the wait result



##########
client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java:
##########
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.*;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
+import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
+import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+
+/** A {@link IndexedInputGate} which ingest data from remote shuffle workers. */
+public class RemoteShuffleInputGate extends IndexedInputGate {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteShuffleInputGate.class);
+
+  /** Lock to protect {@link #receivedBuffers} and {@link #cause} and {@link #closed}. */
+  private final Object lock = new Object();
+
+  /** Name of the corresponding computing task. */
+  private final String taskName;
+
+  /** Index of the gate of the corresponding computing task. */
+  private final int gateIndex;
+
+  /** Deployment descriptor for a single input gate instance. */
+  private final InputGateDeploymentDescriptor gateDescriptor;
+
+  /** Buffer pool provider. */
+  private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
+
+  /** Flink buffer pools to allocate network memory. */
+  private BufferPool bufferPool;
+
+  /** Buffer pool used by the transfer layer. */
+  private final TransferBufferPool transferBufferPool =
+      new TransferBufferPool(Collections.emptySet());
+
+  private final List<RemoteBufferStreamReader> bufferReaders = new ArrayList<>();
+  private final List<InputChannelInfo> channelsInfo;
+  /** Map from channel index to shuffle client index. */
+  private final int[] clientIndexMap;
+
+  /** Map from shuffle client index to channel index. */
+  private final int[] channelIndexMap;
+
+  /** The number of subpartitions that has not consumed per channel. */
+  private final int[] numSubPartitionsHasNotConsumed;
+
+  /** The overall number of subpartitions that has not been consumed. */
+  private long numUnconsumedSubpartitions;
+
+  /** Received buffers from remote shuffle worker. It's consumed by upper computing task. */
+  private final Queue<Pair<Buffer, InputChannelInfo>> receivedBuffers = new LinkedList<>();
+
+  /** {@link Throwable} when reading failure. */
+  private Throwable cause;
+
+  /** Whether this remote input gate has been closed or not. */
+  private boolean closed;
+
+  /** Whether we have opened all initial channels or not. */
+  private boolean initialChannelsOpened;
+
+  /** Number of pending {@link EndOfData} events to be received. */
+  private long pendingEndOfDataEvents;
+  /** Max concurrent reader count */
+  private int numConcurrentReading = Integer.MAX_VALUE;
+  /** Keep compatibility with streaming mode. */
+  private boolean shouldDrainOnEndOfData = true;
+
+  /** Data decompressor. */
+  private final BufferDecompressor bufferDecompressor;
+
+  private FlinkShuffleClientImpl shuffleClient;
+
+  public RemoteShuffleInputGate(
+      CelebornConf celebornConf,
+      String taskName,
+      int gateIndex,
+      InputGateDeploymentDescriptor gateDescriptor,
+      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+      BufferDecompressor bufferDecompressor) {
+
+    this.taskName = taskName;
+    this.gateIndex = gateIndex;
+    this.gateDescriptor = gateDescriptor;
+    this.bufferPoolFactory = bufferPoolFactory;
+
+    int numChannels = gateDescriptor.getShuffleDescriptors().length;
+    this.clientIndexMap = new int[numChannels];
+    this.channelIndexMap = new int[numChannels];
+    this.numSubPartitionsHasNotConsumed = new int[numChannels];
+    this.bufferDecompressor = bufferDecompressor;
+
+    RemoteShuffleDescriptor remoteShuffleDescriptor =
+        (RemoteShuffleDescriptor) gateDescriptor.getShuffleDescriptors()[0];
+    this.shuffleClient =
+        FlinkShuffleClientImpl.get(
+            remoteShuffleDescriptor.getShuffleResource().getRssMetaServiceHost(),
+            remoteShuffleDescriptor.getShuffleResource().getRssMetaServicePort(),
+            celebornConf,
+            new UserIdentifier("default", "default"));
+
+    this.numUnconsumedSubpartitions = initShuffleReadClients();
+    this.pendingEndOfDataEvents = numUnconsumedSubpartitions;
+    this.channelsInfo = createChannelInfos();
+  }
+
+  private long initShuffleReadClients() {
+    int startSubIdx = gateDescriptor.getConsumedSubpartitionIndex();
+    int endSubIdx = gateDescriptor.getConsumedSubpartitionIndex();
+    int numSubpartitionsPerChannel = endSubIdx - startSubIdx + 1;
+    long numUnconsumedSubpartitions = 0;
+
+    // left element is index
+    List<Pair<Integer, ShuffleDescriptor>> descriptors =
+        IntStream.range(0, gateDescriptor.getShuffleDescriptors().length)
+            .mapToObj(i -> Pair.of(i, gateDescriptor.getShuffleDescriptors()[i]))
+            .collect(Collectors.toList());
+
+    int clientIndex = 0;
+    for (Pair<Integer, ShuffleDescriptor> descriptor : descriptors) {
+      RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) descriptor.getRight();
+      ShuffleResourceDescriptor shuffleDescriptor =
+          remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+
+      LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor);
+      String applicationId = remoteDescriptor.getJobID().toString();
+
+      RemoteBufferStreamReader reader =
+          new RemoteBufferStreamReader(
+              shuffleClient,
+              shuffleDescriptor,
+              applicationId,
+              startSubIdx,
+              endSubIdx,
+              transferBufferPool,
+              getDataListener(descriptor.getLeft()),
+              getFailureListener(remoteDescriptor.getResultPartitionID()));
+
+      bufferReaders.add(reader);
+      numSubPartitionsHasNotConsumed[descriptor.getLeft()] = numSubpartitionsPerChannel;
+      numUnconsumedSubpartitions += numSubpartitionsPerChannel;
+      clientIndexMap[descriptor.getLeft()] = clientIndex;
+      channelIndexMap[clientIndex] = descriptor.getLeft();
+      ++clientIndex;
+    }
+    return numUnconsumedSubpartitions;
+  }
+
+  /** Setup gate and build network connections. */
+  @Override
+  public void setup() throws IOException {
+    long startTime = System.nanoTime();
+
+    bufferPool = bufferPoolFactory.get();
+    BufferUtils.reserveNumRequiredBuffers(bufferPool, 16);
+
+    tryRequestBuffers();
+    // Complete availability future though handshake not fired yet, thus to allow fetcher to
+    // 'pollNext' and fire handshake to remote. This mechanism is to avoid bookkeeping remote
+    // reading resource before task start processing data from input gate.
+    availabilityHelper.getUnavailableToResetAvailable().complete(null);
+    LOG.info("Set up read gate by {} ms.", (System.nanoTime() - startTime) / 1000_000);
+  }
+
+  /** Index of the gate of the corresponding computing task. */
+  @Override
+  public int getGateIndex() {
+    return gateIndex;
+  }
+
+  /** Get number of input channels. A channel is a data flow from one shuffle worker. */
+  @Override
+  public int getNumberOfInputChannels() {
+    return bufferReaders.size();
+  }
+
+  /** Whether reading is finished -- all channels are finished and cached buffers are drained. */
+  @Override
+  public boolean isFinished() {
+    synchronized (lock) {
+      return allReadersEOF() && receivedBuffers.isEmpty();
+    }
+  }
+
+  @Override
+  public Optional<BufferOrEvent> getNext() {
+    throw new UnsupportedOperationException("Not implemented (DataSet API is not supported).");
+  }
+
+  /** Poll a received {@link BufferOrEvent}. */
+  @Override
+  public Optional<BufferOrEvent> pollNext() throws IOException {
+    if (!initialChannelsOpened) {
+      tryOpenSomeChannels();
+      initialChannelsOpened = true;
+      // DO NOT return, method of 'getReceived' will manipulate 'availabilityHelper'.
+    }
+
+    Pair<Buffer, InputChannelInfo> pair = getReceived();
+    Optional<BufferOrEvent> bufferOrEvent = Optional.empty();
+    LOG.debug("pollNext called with pair null {}", pair == null);
+    while (pair != null) {
+      Buffer buffer = pair.getLeft();
+      InputChannelInfo channelInfo = pair.getRight();
+      LOG.debug("get buffer {} on channel {}", buffer, channelInfo);
+      if (buffer.isBuffer()) {
+        bufferOrEvent = transformBuffer(buffer, channelInfo);
+      } else {
+        bufferOrEvent = transformEvent(buffer, channelInfo);
+        LOG.info("recevied event: " + bufferOrEvent.get().getEvent().getClass().getName());
+      }
+
+      if (bufferOrEvent.isPresent()) {
+        break;
+      }
+      pair = getReceived();
+    }
+
+    tryRequestBuffers();
+    return bufferOrEvent;
+  }
+
+  private Buffer decompressBufferIfNeeded(Buffer buffer) throws IOException {
+    if (buffer.isCompressed()) {
+      try {
+        checkState(bufferDecompressor != null, "Buffer decompressor not set.");
+        return bufferDecompressor.decompressToIntermediateBuffer(buffer);
+      } catch (Throwable t) {
+        throw new IOException("Decompress failure", t);
+      } finally {
+        buffer.recycleBuffer();
+      }
+    }
+    return buffer;
+  }
+
+  /** Close all reading channels inside this {@link RemoteShuffleInputGate}. */
+  @Override
+  public void close() throws Exception {
+    List<Buffer> buffersToRecycle;
+    Throwable closeException = null;
+    // Do not check closed flag, thus to allow calling this method from both task thread and
+    // cancel thread.
+    for (RemoteBufferStreamReader shuffleReadClient : bufferReaders) {
+      try {
+        shuffleReadClient.close();
+      } catch (Throwable throwable) {
+        closeException = closeException == null ? throwable : closeException;
+        LOG.error("Failed to close shuffle read client.", throwable);
+      }
+    }
+    synchronized (lock) {
+      buffersToRecycle = receivedBuffers.stream().map(Pair::getLeft).collect(Collectors.toList());
+      receivedBuffers.clear();
+      closed = true;
+    }
+
+    try {
+      buffersToRecycle.forEach(Buffer::recycleBuffer);
+    } catch (Throwable throwable) {
+      closeException = closeException == null ? throwable : closeException;
+      LOG.error("Failed to recycle buffers.", throwable);
+    }
+
+    try {
+      transferBufferPool.destroy();
+    } catch (Throwable throwable) {
+      closeException = closeException == null ? throwable : closeException;
+      LOG.error("Failed to close transfer buffer pool.", throwable);
+    }
+
+    try {
+      if (bufferPool != null) {
+        bufferPool.lazyDestroy();
+      }
+    } catch (Throwable throwable) {
+      closeException = closeException == null ? throwable : closeException;
+      LOG.error("Failed to close local buffer pool.", throwable);
+    }
+
+    if (closeException != null) {
+      ExceptionUtils.rethrowException(closeException);
+    }
+  }
+
+  /** Get {@link InputChannelInfo}s of this {@link RemoteShuffleInputGate}. */
+  @Override
+  public List<InputChannelInfo> getChannelInfos() {
+    return channelsInfo;
+  }
+
+  /** Each one corresponds to a reading channel. */
+  public List<RemoteBufferStreamReader> getBufferReaders() {
+    return bufferReaders;
+  }
+
+  private List<InputChannelInfo> createChannelInfos() {
+    return IntStream.range(0, gateDescriptor.getShuffleDescriptors().length)
+        .mapToObj(i -> new InputChannelInfo(gateIndex, i))
+        .collect(Collectors.toList());
+  }
+
+  /** Try to open more readers to {@link #numConcurrentReading}. */
+  private void tryOpenSomeChannels() throws IOException {
+    List<RemoteBufferStreamReader> clientsToOpen = new ArrayList<>();
+
+    synchronized (lock) {
+      if (closed) {
+        throw new IOException("Input gate already closed.");
+      }
+
+      LOG.debug("Try open some partition readers.");
+      int numOnGoing = 0;
+      for (int i = 0; i < bufferReaders.size(); i++) {
+        RemoteBufferStreamReader bufferStreamReader = bufferReaders.get(i);
+        LOG.debug(
+            "Trying reader: {}, isOpened={}, numSubPartitionsHasNotConsumed={}.",
+            bufferStreamReader,
+            bufferStreamReader.isOpened(),
+            numSubPartitionsHasNotConsumed[channelIndexMap[i]]);
+        if (numOnGoing >= numConcurrentReading) {

Review Comment:
   not support concurrent reading limit now, this could be done in future!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org