You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "WencongLiu (via GitHub)" <gi...@apache.org> on 2023/04/04 09:26:00 UTC

[GitHub] [flink] WencongLiu opened a new pull request, #22342: [FLINK-31636][runtime] Upstream supports reading buffers from tiered store

WencongLiu opened a new pull request, #22342:
URL: https://github.com/apache/flink/pull/22342

   ## What is the purpose of the change
   
   ### Enable the upstream have the ability to read buffers from tiered store.
   
   To achieve this purpose, we could break down the implementation into 4 stages and achieve them sequentially.
   
   ### Stage1: Introduce two basic components: BufferContext and BufferIndexAndSubpartitionId
   
   BufferIndexAndSubpartitionId represents the  combination of buffer Index and subpartition id. 
   
   BufferContext represents the combination of Buffer and BufferIndexAndSubpartitionId.
   
   BufferContext can be read from each tier.
   
   This stage includes one commit:
   
   `Commit 1: Introduce BufferIndexAndSubpartitionId and BufferContext`
   
   
   ### Stage2: Introduce the TierReader interface
   
   TierReader is used to read buffer from tiers. For a single subpartition, each tier will generate a tier reader. We introduce it in this stage.
   
   This stage includes one commit:
   
   `Commit 2: Introduce the TierReader interface`
   
   
   ### Stage3: Introduce the TierReaderViewId
   
   TierReaderViewId represents the identifier of TierReaderView, every TierReaderView has a specific id.
   
   This stage includes one commit:
   
   `Commit 3: Introduce the TierReaderViewId`
   
   
   ### Stage4: Introduce the TierReaderView and TierReaderViewImpl
   
   For each TierReader, there will be a corresponding TierReaderView, which is used by each tier to notify the available status of TierReader and get buffer and backlog from TierReader.
   
   This stage includes one commit:
   
   `Commit 4: Introduce the TierReaderView and TierReaderViewImpl`
      
   
   ## Brief change log
   
   - *Introduce two basic components: BufferContext and BufferIndexAndSubpartitionId*
   - *Introduce the TierReader interface*
   - *Introduce the TierReaderViewId*
   - *Introduce the TierReaderView and TierReaderViewImpl*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   
     - *TierReaderViewTest*
     - *TierReaderViewIdTest*
     - *BufferIndexAndSubpartitionIdTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on PR #22342:
URL: https://github.com/apache/flink/pull/22342#issuecomment-1578747105

   @xintongsong  I have made a round of changes. Please take a look at it when you have time. 😄


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1184908682


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu closed pull request #22342: [FLINK-31636][runtime] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu closed pull request #22342: [FLINK-31636][runtime] Upstream supports reading buffers from tiered store
URL: https://github.com/apache/flink/pull/22342


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1209306292


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/ConsumerNettyService.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** {@link ConsumerNettyService} is used to consume buffer from netty client in consumer side. */
+public interface ConsumerNettyService {
+
+    /**
+     * Set up the netty service in consumer side.
+     *
+     * @param inputChannels in consumer side.
+     * @param lastPrioritySequenceNumber is the array to record the priority sequence number.
+     * @param subpartitionAvailableNotifier is used to notify the subpartition is available.
+     */
+    void setup(
+            InputChannel[] inputChannels,
+            int[] lastPrioritySequenceNumber,
+            BiConsumer<Integer, Boolean> subpartitionAvailableNotifier);
+
+    /**
+     * Read a buffer related to the specific subpartition from NettyService.

Review Comment:
   Fixed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/CreditBasedBufferQueueViewImpl.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The implementation of {@link CreditBasedBufferQueueView}. */
+public class CreditBasedBufferQueueViewImpl implements CreditBasedBufferQueueView {
+
+    private final BufferAvailabilityListener availabilityListener;
+
+    private int consumedBufferIndex = -1;
+
+    @GuardedBy("viewLock")

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1222361205


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {
+
+    // ------------------------------------
+    //          For producer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>>
+            registeredServiceProducers = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();

Review Comment:
   Shouldn't this belong to the consumer side?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderImpl.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Function;
+
+/** The default implementation of {@link NettyConnectionReader}. */
+public class NettyConnectionReaderImpl implements NettyConnectionReader {
+
+    /** The index of input channel related to the reader. */
+    private final int inputChannelIndex;
+
+    /** The provider to provide the input channel by channel index. */
+    private final Function<Integer, InputChannel> inputChannelProvider;
+
+    /** The helper is used to notify the available and priority status of reader. */
+    private final NettyConnectionReaderAvailabilityAndPriorityHelper helper;
+
+    /** The last required segment id. */
+    private int lastRequiredSegmentId = 0;
+
+    public NettyConnectionReaderImpl(
+            int inputChannelIndex,
+            Function<Integer, InputChannel> inputChannelProvider,

Review Comment:
   Why do we need this integer as the function input?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    /**
+     * The data buffer. If the buffer is not null, bufferIndex and subpartitionId will be
+     * non-negative, error will be null, segmentId will be -1;
+     */
+    @Nullable private final Buffer buffer;
+
+    /**
+     * The error information. If the error is not null, buffer will be null, segmentId and
+     * bufferIndex and subpartitionId will be -1.
+     */
+    @Nullable private final Throwable error;
+
+    /**
+     * The index of buffer. If the buffer index is non-negative, buffer won't be null, error will be
+     * null, subpartitionId will be non-negative, segmentId will be -1.
+     */
+    private final int bufferIndex;
+
+    /**
+     * The id of subpartition. If the subpartition id is non-negative, buffer won't be null, error
+     * will be null, bufferIndex will be non-negative, segmentId will be -1.
+     */
+    private final int subpartitionId;
+
+    /**
+     * The id of segment. If the segment id is non-negative, buffer and error be null, bufferIndex
+     * and subpartitionId will be -1.
+     */
+    private final int segmentId;
+
+    private NettyPayload(
+            @Nullable Buffer buffer,
+            int bufferIndex,
+            int subpartitionId,
+            @Nullable Throwable error,
+            int segmentId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+        this.error = error;
+        this.segmentId = segmentId;
+    }
+
+    public static NettyPayload newBuffer(Buffer buffer, int bufferIndex, int subpartitionId) {
+        checkState(buffer != null && bufferIndex != -1 && subpartitionId != -1);
+        return new NettyPayload(buffer, bufferIndex, subpartitionId, null, -1);
+    }
+
+    public static NettyPayload newError(Throwable error) {
+        checkState(error != null);
+        return new NettyPayload(null, -1, -1, error, -1);
+    }
+
+    public static NettyPayload newSegment(int segmentId) {
+        checkState(segmentId != -1);
+        return new NettyPayload(null, -1, -1, null, segmentId);
+    }
+
+    public Optional<Buffer> getBuffer() {
+        return buffer != null ? Optional.of(buffer) : Optional.empty();
+    }
+
+    @Nullable
+    public Throwable getError() {
+        return error;
+    }

Review Comment:
   Optional



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on PR #22342:
URL: https://github.com/apache/flink/pull/22342#issuecomment-1536438795

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223745155


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.Queue;
+
+/** The default implementation of {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterImpl implements NettyConnectionWriter {
+
+    private final Queue<NettyPayload> bufferQueue;
+
+    private final NettyConnectionId connectionId;
+
+    public NettyConnectionWriterImpl(Queue<NettyPayload> bufferQueue) {
+        this.bufferQueue = bufferQueue;
+        this.connectionId = NettyConnectionId.newId();
+    }
+
+    @Override
+    public NettyConnectionId getNettyConnectionId() {
+        return connectionId;
+    }
+
+    @Override
+    public int numQueuedBuffers() {
+        return bufferQueue.size();
+    }
+
+    @Override
+    public void writeBuffer(NettyPayload nettyPayload) {
+        bufferQueue.add(nettyPayload);
+    }
+
+    @Override
+    public void close() {
+        NettyPayload nettyPayload;
+        while ((nettyPayload = bufferQueue.poll()) != null) {

Review Comment:
   If the NettyConnectionWriterImpl is trying to close, it will ignore all payloads in the writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1220671319


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyProducerService} is used as the callback to register {@link NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyProducerService {

Review Comment:
   ```suggestion
   public interface NettyServiceProducer {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyProducerService} is used as the callback to register {@link NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyProducerService {
+
+    /**
+     * Register a {@link NettyConnectionWriter} for a subpartition.
+     *
+     * @param subpartitionId subpartition id indicates the id of subpartition.
+     * @param nettyConnectionWriter writer is used to write buffers to netty connection.
+     */
+    void registerNettyConnectionWriter(

Review Comment:
   ```suggestion
       void connectionEstablished(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyProducerService.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyProducerService} is used as the callback to register {@link NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyProducerService {
+
+    /**
+     * Register a {@link NettyConnectionWriter} for a subpartition.
+     *
+     * @param subpartitionId subpartition id indicates the id of subpartition.
+     * @param nettyConnectionWriter writer is used to write buffers to netty connection.
+     */
+    void registerNettyConnectionWriter(
+            TieredStorageSubpartitionId subpartitionId,
+            NettyConnectionWriter nettyConnectionWriter);
+
+    /**
+     * Disconnect the netty connection related to the {@link NettyConnectionId}.
+     *
+     * @param connectionId connection id is the id of connection.
+     */
+    void disconnectNettyConnection(NettyConnectionId connectionId);

Review Comment:
   ```suggestion
       void connectionBroken(NettyConnectionId connectionId);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;

Review Comment:
   What happens to the `@Nullable`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;
+
+    // If the bufferIndex is non-negative, buffer won't be null, error will be null, subpartitionId
+    // will be non-negative, segmentId will be -1.
+    private int bufferIndex = -1;
+
+    // If the subpartitionId is non-negative, buffer won't be null, error will be null, bufferIndex
+    // will be non-negative, segmentId will be -1.
+    private int subpartitionId = -1;
+
+    // If the segmentId is non-negative, buffer and error be null, bufferIndex and subpartitionId
+    // will be -1.
+    private int segmentId = -1;
+
+    public NettyPayload(Buffer buffer, int bufferIndex, int subpartitionId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+    }
+
+    public NettyPayload(Throwable error) {
+        this.error = error;
+    }
+
+    public NettyPayload(int segmentId) {
+        this.segmentId = segmentId;
+    }

Review Comment:
   Should check the arguments.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;
+
+    // If the bufferIndex is non-negative, buffer won't be null, error will be null, subpartitionId
+    // will be non-negative, segmentId will be -1.
+    private int bufferIndex = -1;
+
+    // If the subpartitionId is non-negative, buffer won't be null, error will be null, bufferIndex
+    // will be non-negative, segmentId will be -1.
+    private int subpartitionId = -1;
+
+    // If the segmentId is non-negative, buffer and error be null, bufferIndex and subpartitionId
+    // will be -1.
+    private int segmentId = -1;
+
+    public NettyPayload(Buffer buffer, int bufferIndex, int subpartitionId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+    }
+
+    public NettyPayload(Throwable error) {
+        this.error = error;
+    }
+
+    public NettyPayload(int segmentId) {
+        this.segmentId = segmentId;
+    }

Review Comment:
   I'd suggest to use factories rather than constructors, so that they can have meaningful names.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffers to netty
+ * connection.
+ */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer to netty connection.
+     *
+     * @param nettyPayload buffer context represents the buffer.
+     */
+    void writeBuffer(NettyPayload nettyPayload);
+
+    /**
+     * Get the id of connection in the writer.
+     *
+     * @return the id of connection.
+     */
+    NettyConnectionId getNettyConnectionId();
+
+    /**
+     * Get the number of existed buffers in the writer. The {@link NettyConnectionWriter} may be
+     * implemented based on a queue structure, this method is used to get the residual buffers in
+     * the writer.
+     *
+     * @return the buffer number.
+     */
+    int numQueuedBuffers();

Review Comment:
   ```
   The {@link NettyConnectionWriter} may be implemented based on a queue structure
   ```
   1. This belongs to the class javadoc.
   2. What is this queue?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderImpl.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** The default implementation of {@link NettyConnectionReader}. */
+public class NettyConnectionReaderImpl implements NettyConnectionReader {
+
+    /** subpartitionId is used to indicate the id of subpartition. */
+    private final int subpartitionId;
+
+    /**
+     * inputChannels is initialized in {@link SingleInputGate} and used to read buffer from netty
+     * connection.
+     */
+    private final InputChannel[] inputChannels;
+
+    /**
+     * queueChannelCallback is used to queue a channel to {@link SingleInputGate} to trigger next
+     * round of reading from this channel. The Integer in the return type is the index of input
+     * channel and the Boolean in the return type is whether to queue the channel with priority.
+     */
+    private final BiConsumer<Integer, Boolean> queueChannelCallback;
+
+    /**
+     * The array is used to record the latest sequence number of buffer with priority data type,
+     * which can decide outdated status of sequence number and whether to enqueue the related input
+     * channel to {@link SingleInputGate}.
+     */
+    private final int[] lastPrioritySequenceNumber;

Review Comment:
   1. `queueChannelCallback` -> `availabilityAndPriorityNotifier`.
   2. Should expose `lastPrioritySequenceNumber` via interfaces, rather than internal fields shared by two classes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionId.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import java.util.Objects;
+
+/** {@link NettyConnectionId} indicates the unique id of netty connection. */
+public class NettyConnectionId {
+
+    // The default id of netty connection.
+    private static int defaultId = 0;

Review Comment:
   1. Should not be named with "default".
   2. The static field can be concurrently accessed, and would have consistency issues.
   
   I'd suggest to use a large random value, or partitionId + subpartitionId + a small random value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderImpl.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** The default implementation of {@link NettyConnectionReader}. */
+public class NettyConnectionReaderImpl implements NettyConnectionReader {
+
+    /** subpartitionId is used to indicate the id of subpartition. */
+    private final int subpartitionId;
+
+    /**
+     * inputChannels is initialized in {@link SingleInputGate} and used to read buffer from netty
+     * connection.
+     */
+    private final InputChannel[] inputChannels;
+
+    /**
+     * queueChannelCallback is used to queue a channel to {@link SingleInputGate} to trigger next
+     * round of reading from this channel. The Integer in the return type is the index of input
+     * channel and the Boolean in the return type is whether to queue the channel with priority.
+     */
+    private final BiConsumer<Integer, Boolean> queueChannelCallback;
+
+    /**
+     * The array is used to record the latest sequence number of buffer with priority data type,
+     * which can decide outdated status of sequence number and whether to enqueue the related input
+     * channel to {@link SingleInputGate}.
+     */
+    private final int[] lastPrioritySequenceNumber;
+
+    /** The last required segment id. */
+    private int lastRequiredSegmentId = 0;
+
+    public NettyConnectionReaderImpl(
+            int subpartitionId,
+            InputChannel[] inputChannels,
+            BiConsumer<Integer, Boolean> queueChannelCallback,
+            int[] lastPrioritySequenceNumber) {
+        this.subpartitionId = subpartitionId;
+        this.inputChannels = inputChannels;
+        this.queueChannelCallback = queueChannelCallback;
+        this.lastPrioritySequenceNumber = lastPrioritySequenceNumber;
+    }
+
+    @Override
+    public Optional<Buffer> readBuffer(int segmentId) {
+        if (segmentId > 0L && (segmentId != lastRequiredSegmentId)) {
+            lastRequiredSegmentId = segmentId;
+            inputChannels[subpartitionId].notifyRequiredSegmentId(segmentId);
+        }
+        Optional<InputChannel.BufferAndAvailability> bufferAndAvailability = Optional.empty();
+        try {
+            bufferAndAvailability = inputChannels[subpartitionId].getNextBuffer();

Review Comment:
   This doesn't seems right.
   - Number of channels corresponds to the upstream parallelism, while subpartitionId corresponds to the downstream parallelism. We cannot use subpartitionId to index an input channel.
   - If we only need to access `inputChannels[subpartitionId]` in this class, why would we need the entire array?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {
+
+    private final Map<TieredStoragePartitionId, List<NettyProducerService>>
+            registeredProducerServices = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, InputChannel[]>>
+            registeredInputChannels = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, BiConsumer<Integer, Boolean>>>
+            registeredQueueChannelCallbacks = new ConcurrentHashMap<>();
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, int[]>>
+            registeredPriorityArrays = new ConcurrentHashMap<>();

Review Comment:
   Might be better to group them by producer and consumer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    // If the buffer is not null, bufferIndex and subpartitionId will be non-negative, error will be
+    // null, segmentId will be -1;
+    private Buffer buffer;
+
+    // If the error is not null, buffer will be null, segmentId and bufferIndex and subpartitionId
+    // will be -1.
+    private Throwable error;
+
+    // If the bufferIndex is non-negative, buffer won't be null, error will be null, subpartitionId
+    // will be non-negative, segmentId will be -1.
+    private int bufferIndex = -1;
+
+    // If the subpartitionId is non-negative, buffer won't be null, error will be null, bufferIndex
+    // will be non-negative, segmentId will be -1.
+    private int subpartitionId = -1;

Review Comment:
   Use JavaDoc.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {

Review Comment:
   Take a look at `BufferIndexOrError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on PR #22342:
URL: https://github.com/apache/flink/pull/22342#issuecomment-1536283987

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1222589878


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -311,6 +311,16 @@ public long unsynchronizedGetSizeOfQueuedBuffers() {
         return 0;
     }
 
+    /**
+     * Notify the upstream the id of required segment that should be sent to netty connection.

Review Comment:
   This seems like a sick sentence 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderAvailabilityAndPriorityHelper.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+/**
+ * {@link NettyConnectionReaderAvailabilityAndPriorityHelper} is used to help the reader notify the
+ * available and priority status of {@link NettyConnectionReader}, and update the priority sequence

Review Comment:
   ```suggestion
    * availability and priority status of {@link NettyConnectionReader}, and update the priority sequence
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.util.Optional;
+
+/** {@link NettyConnectionReader} is used by {@link TierConsumerAgent} to read buffer from netty. */
+public interface NettyConnectionReader {
+    /**
+     * Read a buffer from netty connection.
+     *
+     * @param segmentId segment id indicates the id of segment.
+     * @return Optional.empty() will be returned if there is no buffer sent from netty connection

Review Comment:
   ```suggestion
        * @return {@link Optional#empty()} will be returned if there is no buffer sent from netty connection
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in tiered storage shuffle
+ * mode.
+ */
+@Internal
+public class EndOfSegmentEvent extends RuntimeEvent {
+
+    /** The singleton instance of this event. */
+    public static final EndOfSegmentEvent INSTANCE = new EndOfSegmentEvent();

Review Comment:
   We should disable(private) the default `ctr` for this class if it should be a singlton.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffers to netty
+ * connection. Buffers in the writer will be written to a queue structure and netty server will send
+ * buffers from it.
+ */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer to netty connection.
+     *
+     * @param nettyPayload netty payload represents the buffer.

Review Comment:
   If it represents the buffer, why not directly use `Buffer` here? I'd prefer change this description to `the payload send to netty connection`.
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterTest {
+
+    private static final int SUBPARTITION_ID = 0;
+
+    @Test
+    void testWriteBuffer() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        assertThat(nettyPayloadQueue.size()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testGetNettyConnectionId() {
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
+    }
+
+    @Test
+    void testNumQueuedBuffers() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testClose() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        nettyConnectionWriter.close();
+        assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(0);
+    }
+
+    private void writeBufferTpWriter(

Review Comment:
   ```suggestion
       private void writeBufferToWriter(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+    private final BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+            connectionEstablishConsumer;

Review Comment:
   ```suggestion
               connectionEstablishedConsumer;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceProducer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyServiceProducer} is used as the callback to register {@link NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyServiceProducer {
+
+    /**
+     * Establish a netty connection for a subpartition.

Review Comment:
   This java doc a bit confusing, It looks like this method will proactively establish a connection, but in reality it is just a callback.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+    private final BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+            connectionEstablishConsumer;
+
+    private final Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+    public TestingNettyServiceProducer(
+            BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+                    connectionEstablishConsumer,
+            Consumer<NettyConnectionId> connectionBrokenConsumer) {
+        this.connectionEstablishConsumer = connectionEstablishConsumer;
+        this.connectionBrokenConsumer = connectionBrokenConsumer;
+    }
+
+    @Override
+    public void connectionEstablished(
+            TieredStorageSubpartitionId subpartitionId,
+            NettyConnectionWriter nettyConnectionWriter) {
+        connectionEstablishConsumer.accept(subpartitionId, nettyConnectionWriter);
+    }
+
+    @Override
+    public void connectionBroken(NettyConnectionId connectionId) {
+        connectionBrokenConsumer.accept(connectionId);
+    }
+
+    /** Builder for {@link TestingNettyServiceProducer}. */
+    public static class Builder {
+
+        private BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+                connectionEstablishConsumer;
+
+        private Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+        public Builder() {}

Review Comment:
   Why we need this `ctr` instead of the default one compiler generated.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();

Review Comment:
   ```suggestion
           assertThat(nettyPayload.getBuffer()).isPresent();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {

Review Comment:
   ```suggestion
       void testGetError() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);
+    }
+
+    @Test
+    void getBufferIndex() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBufferIndex()).isEqualTo(bufferIndex);
+    }
+
+    @Test
+    void getSubpartitionId() {

Review Comment:
   ```suggestion
       void testGetSubpartitionId() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);
+    }
+
+    @Test
+    void getBufferIndex() {

Review Comment:
   ```suggestion
       void testGetBufferIndex() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);
+    }
+
+    @Test
+    void getBufferIndex() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBufferIndex()).isEqualTo(bufferIndex);
+    }
+
+    @Test
+    void getSubpartitionId() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getSubpartitionId()).isEqualTo(subpartitionId);
+    }
+
+    @Test
+    void getSegmentId() {

Review Comment:
   ```suggestion
       void testGetSegmentId() {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    /**
+     * The data buffer. If the buffer is not null, bufferIndex and subpartitionId will be
+     * non-negative, error will be null, segmentId will be -1;
+     */
+    @Nullable private final Buffer buffer;
+
+    /**
+     * The error information. If the error is not null, buffer will be null, segmentId and
+     * bufferIndex and subpartitionId will be -1.
+     */
+    @Nullable private final Throwable error;
+
+    /**
+     * The index of buffer. If the buffer index is non-negative, buffer won't be null, error will be
+     * null, subpartitionId will be non-negative, segmentId will be -1.
+     */
+    private final int bufferIndex;
+
+    /**
+     * The id of subpartition. If the subpartition id is non-negative, buffer won't be null, error
+     * will be null, bufferIndex will be non-negative, segmentId will be -1.
+     */
+    private final int subpartitionId;
+
+    /**
+     * The id of segment. If the segment id is non-negative, buffer and error be null, bufferIndex

Review Comment:
   ```suggestion
        * The id of segment. If the segment id is non-negative, buffer and error will be null, bufferIndex
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayload.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link NettyPayload} represents the payload that will be transferred to netty connection. It
+ * could indicate a combination of buffer, buffer index, and its subpartition id, and it could also
+ * indicate an error or a segment id.
+ */
+public class NettyPayload {
+
+    /**
+     * The data buffer. If the buffer is not null, bufferIndex and subpartitionId will be
+     * non-negative, error will be null, segmentId will be -1;
+     */
+    @Nullable private final Buffer buffer;
+
+    /**
+     * The error information. If the error is not null, buffer will be null, segmentId and
+     * bufferIndex and subpartitionId will be -1.
+     */
+    @Nullable private final Throwable error;
+
+    /**
+     * The index of buffer. If the buffer index is non-negative, buffer won't be null, error will be
+     * null, subpartitionId will be non-negative, segmentId will be -1.
+     */
+    private final int bufferIndex;
+
+    /**
+     * The id of subpartition. If the subpartition id is non-negative, buffer won't be null, error
+     * will be null, bufferIndex will be non-negative, segmentId will be -1.
+     */
+    private final int subpartitionId;
+
+    /**
+     * The id of segment. If the segment id is non-negative, buffer and error be null, bufferIndex
+     * and subpartitionId will be -1.
+     */
+    private final int segmentId;
+
+    private NettyPayload(
+            @Nullable Buffer buffer,
+            int bufferIndex,
+            int subpartitionId,
+            @Nullable Throwable error,
+            int segmentId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+        this.error = error;
+        this.segmentId = segmentId;
+    }
+
+    public static NettyPayload newBuffer(Buffer buffer, int bufferIndex, int subpartitionId) {
+        checkState(buffer != null && bufferIndex != -1 && subpartitionId != -1);
+        return new NettyPayload(buffer, bufferIndex, subpartitionId, null, -1);
+    }
+
+    public static NettyPayload newError(Throwable error) {
+        checkState(error != null);
+        return new NettyPayload(null, -1, -1, error, -1);

Review Comment:
   ```suggestion
           return new NettyPayload(null, -1, -1, checkNotNull(error), -1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {
+                inputChannel.read(
+                        new NetworkBuffer(
+                                MemorySegmentFactory.allocateUnpooledSegment(0),
+                                BufferRecycler.DummyBufferRecycler.INSTANCE,
+                                priority ? PRIORITIZED_EVENT_BUFFER : DATA_BUFFER),
+                        NONE);
+            } else {
+                for (int index = 0; index < bufferNumber; ++index) {
+                    inputChannel.read(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE,
+                                    priority ? PRIORITIZED_EVENT_BUFFER : DATA_BUFFER));
+                }
+            }
+        } catch (IOException | InterruptedException e) {
+            ExceptionUtils.rethrow(e, "Failed to create test input channel.");
+        }
+        return () -> inputChannel;
+    }
+
+    private NettyConnectionReader createNettyConnectionReader(

Review Comment:
   Can be static.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.Queue;
+
+/** The default implementation of {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterImpl implements NettyConnectionWriter {
+
+    private final Queue<NettyPayload> bufferQueue;
+
+    private final NettyConnectionId connectionId;
+
+    public NettyConnectionWriterImpl(Queue<NettyPayload> bufferQueue) {
+        this.bufferQueue = bufferQueue;
+        this.connectionId = NettyConnectionId.newId();
+    }
+
+    @Override
+    public NettyConnectionId getNettyConnectionId() {
+        return connectionId;
+    }
+
+    @Override
+    public int numQueuedBuffers() {
+        return bufferQueue.size();
+    }
+
+    @Override
+    public void writeBuffer(NettyPayload nettyPayload) {
+        bufferQueue.add(nettyPayload);
+    }
+
+    @Override
+    public void close() {
+        NettyPayload nettyPayload;
+        while ((nettyPayload = bufferQueue.poll()) != null) {

Review Comment:
   I wonder can we ensure that `bufferQueue ` do not have `Throwable` type payload during close it. If not, what will happen to this case?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffers to netty
+ * connection. Buffers in the writer will be written to a queue structure and netty server will send
+ * buffers from it.
+ */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer to netty connection.
+     *
+     * @param nettyPayload netty payload represents the buffer.
+     */
+    void writeBuffer(NettyPayload nettyPayload);
+
+    /**
+     * Get the id of connection in the writer.
+     *
+     * @return the id of connection.
+     */
+    NettyConnectionId getNettyConnectionId();
+
+    /**
+     * Get the number of written but unsent buffers.
+     *
+     * @return the buffer number.
+     */
+    int numQueuedBuffers();
+
+    /** Close the connection and release all resources. */
+    void close();

Review Comment:
   Maybe this interface can extends `AutoClosable`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();

Review Comment:
   AssertJ has it's specific assertions for `Optional` and `CompletableFuture`, please use that. You should also check all same issues for all tests.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {
+                inputChannel.read(
+                        new NetworkBuffer(
+                                MemorySegmentFactory.allocateUnpooledSegment(0),
+                                BufferRecycler.DummyBufferRecycler.INSTANCE,

Review Comment:
   Why not use `FreeingBufferRecycler.INSTANCE`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);
+    }
+
+    @Test
+    void getError() {
+        Throwable error = new RuntimeException("test exception");
+        NettyPayload nettyPayload = NettyPayload.newError(error);
+        assertThat(nettyPayload.getError()).isEqualTo(error);

Review Comment:
   ```suggestion
           assertThat(nettyPayload.getError()).hasValue(error);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterTest {
+
+    private static final int SUBPARTITION_ID = 0;
+
+    @Test
+    void testWriteBuffer() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        assertThat(nettyPayloadQueue.size()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testGetNettyConnectionId() {
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
+    }
+
+    @Test
+    void testNumQueuedBuffers() {

Review Comment:
   I wonder can we merge this method to `testWriteBuffer `?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceProducer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/**
+ * {@link NettyServiceProducer} is used as the callback to register {@link NettyConnectionWriter}
+ * and disconnect netty connection in {@link TierProducerAgent}.
+ */
+public interface NettyServiceProducer {
+
+    /**
+     * Establish a netty connection for a subpartition.
+     *
+     * @param subpartitionId subpartition id indicates the id of subpartition.
+     * @param nettyConnectionWriter writer is used to write buffers to netty connection.
+     */
+    void connectionEstablished(
+            TieredStorageSubpartitionId subpartitionId,
+            NettyConnectionWriter nettyConnectionWriter);
+
+    /**
+     * Break the netty connection related to the {@link NettyConnectionId}.

Review Comment:
   Ditto.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionView.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+
+/**
+ * The {@link TieredStoreResultSubpartitionView} is the implementation of {@link
+ * ResultSubpartitionView} of {@link TieredResultPartition}.
+ */
+public class TieredStoreResultSubpartitionView implements ResultSubpartitionView {
+
+    private final BufferAvailabilityListener availabilityListener;
+
+    private final List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private final List<NettyServiceProducer> serviceProducers;
+
+    private final List<NettyConnectionId> nettyConnectionIds;
+
+    private boolean isReleased = false;

Review Comment:
   Is this filed thread safe?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, InterruptedException {

Review Comment:
   There are too many duplicate codes in these tests. I'd prefer extract and reuse their common parts.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {

Review Comment:
   We should disable(private) the non-parameters `ctr` for this class.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {

Review Comment:
   ```suggestion
       void testGetBuffer() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {

Review Comment:
   I wonder why do we need to make a distinction here? It seems that this is only for deciding the `nextDataType`, 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {
+
+    // ------------------------------------
+    //          For producer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>>
+            registeredServiceProducers = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    // ------------------------------------
+    //          For consumer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, Supplier<InputChannel>>>
+            registeredInputChannelProviders = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<
+                            TieredStorageSubpartitionId,
+                            NettyConnectionReaderAvailabilityAndPriorityHelper>>
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers =
+                    new ConcurrentHashMap<>();
+
+    @Override
+    public void registerProducer(
+            TieredStoragePartitionId partitionId, NettyServiceProducer serviceProducer) {
+        List<NettyServiceProducer> serviceProducers =
+                registeredServiceProducers.getOrDefault(partitionId, new ArrayList<>());
+        serviceProducers.add(serviceProducer);
+        registeredServiceProducers.put(partitionId, serviceProducers);

Review Comment:
   I'd prefer using pattern like `registeredServiceProducers.computeIfAbsent().add()`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);

Review Comment:
   ```suggestion
           Buffer buffer = BufferBuilderTestUtils.buildSomeBuffer(0);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {
+
+    @Test
+    void getBuffer() {
+        Buffer buffer =
+                new NetworkBuffer(
+                        MemorySegmentFactory.allocateUnpooledSegment(0),
+                        FreeingBufferRecycler.INSTANCE,
+                        Buffer.DataType.DATA_BUFFER,
+                        0);
+        int bufferIndex = 0;
+        int subpartitionId = 1;
+        NettyPayload nettyPayload = NettyPayload.newBuffer(buffer, bufferIndex, subpartitionId);
+        assertThat(nettyPayload.getBuffer().isPresent()).isTrue();
+        assertThat(nettyPayload.getBuffer().get()).isEqualTo(buffer);

Review Comment:
   ```suggestion
       assertThat(nettyPayload.getBuffer()).hasValue(buffer);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/NettyPayloadTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyPayload}. */
+class NettyPayloadTest {

Review Comment:
   We should also test some incorrect cases.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {

Review Comment:
   I am a bit uneasy about this class. We rely too much on `ConCurrentHashMap` instead of locks, which means we need to carefully check the code to ensure some operations are atomic. Additionally, since some methods do not have a caller in product codes, it is not known on which threads they will be called, so it is best to add some comments about them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223868769


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {

Review Comment:
   I agree with you that the `NextDataType` should be `NONE` when there are `N` buffers. I've modified the code to set the `NextDataType` to `NONE` coercively for the last buffer.😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1181454403


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;

Review Comment:
   It should be explicitly documented that under what condition can `buffer` or `throwable` be null, and whether they can be null or non-null at the same time.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;
+
+    private int bufferIndex;
+
+    private int subpartitionId;
+
+    public BufferContext(@Nullable Buffer buffer, int bufferIndex, int subpartitionId) {

Review Comment:
   `buffer` should not be nullable. When this constructor is called, the buffer context must represents a buffer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;
+
+    private int bufferIndex;
+
+    private int subpartitionId;

Review Comment:
   It should also be documented that, IIUC, these values are invalid if the buffer context represents an error.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;
+
+    private int bufferIndex;
+
+    private int subpartitionId;
+
+    public BufferContext(@Nullable Buffer buffer, int bufferIndex, int subpartitionId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+    }
+
+    public BufferContext(@Nullable Throwable throwable) {

Review Comment:
   Same here, `throwable` cannot be null when this constructor is called.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/netty/NettyBufferQueue.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.netty;

Review Comment:
   I'd suggest to place `netty` directly under `tiered`, rather than under `storage`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1212512688


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in tiered storage shuffle
+ * mode.
+ */
+@Internal
+public class EndOfSegmentEvent extends RuntimeEvent {
+
+    /** The singleton instance of this event. */
+    public static final EndOfSegmentEvent INSTANCE = new EndOfSegmentEvent();
+
+    @Override
+    public void write(DataOutputView out) throws IOException {
+        throw new UnsupportedOperationException("This method should never be called");
+    }
+
+    @Override
+    public void read(DataInputView in) throws IOException {
+        throw new UnsupportedOperationException("This method should never be called");
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    public int hashCode() {

Review Comment:
   This is a duplicated value with `EndOfPartitionEvent`, which may cause some strange errors when used. Please change a new random value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceWriter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.impl.TieredStorageNettyServiceImpl;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+
+/** {@link NettyServiceWriter} is used to write buffers to {@link TieredStorageNettyServiceImpl}. */
+public interface NettyServiceWriter {
+    /**
+     * Write a buffer.
+     *
+     * @param bufferContext buffer context represents the buffer.
+     */
+    void writeBuffer(BufferContext bufferContext);
+
+    /**
+     * Get the number of existed buffers in the writer.
+     *
+     * @return the buffer number.
+     */
+    int size();
+
+    /** Clear and recycle all existed buffers in the writer. */

Review Comment:
   existing buffers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1184909667


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;
+
+    private int bufferIndex;
+
+    private int subpartitionId;
+
+    public BufferContext(@Nullable Buffer buffer, int bufferIndex, int subpartitionId) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223798313


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {

Review Comment:
   If that's the case, why is the `NextDataType` of the last one not `NONE` when there are `N` buffers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on PR #22342:
URL: https://github.com/apache/flink/pull/22342#issuecomment-1583894707

   Thanks for the careful review of @reswqa and @TanYuxin-tyx ! 😄 I've made a round of changes, please take a lock when you  have time!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1209299273


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in Tiered Store shuffle

Review Comment:
   All the `Tiered Store` should be renamed to `tiered storage`(Little case).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/ConsumerNettyService.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** {@link ConsumerNettyService} is used to consume buffer from netty client in consumer side. */
+public interface ConsumerNettyService {
+
+    /**
+     * Set up the netty service in consumer side.
+     *
+     * @param inputChannels in consumer side.
+     * @param lastPrioritySequenceNumber is the array to record the priority sequence number.
+     * @param subpartitionAvailableNotifier is used to notify the subpartition is available.
+     */
+    void setup(
+            InputChannel[] inputChannels,
+            int[] lastPrioritySequenceNumber,
+            BiConsumer<Integer, Boolean> subpartitionAvailableNotifier);
+
+    /**
+     * Read a buffer related to the specific subpartition from NettyService.

Review Comment:
   Where is the NettyService?  We had better use {@link xxx} instead, otherwise, when renaming a class, the code similar to this maybe wrongly ignored.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/CreditBasedBufferQueueViewImpl.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The implementation of {@link CreditBasedBufferQueueView}. */
+public class CreditBasedBufferQueueViewImpl implements CreditBasedBufferQueueView {
+
+    private final BufferAvailabilityListener availabilityListener;
+
+    private int consumedBufferIndex = -1;
+
+    @GuardedBy("viewLock")

Review Comment:
   This should be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1212845100


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in tiered storage shuffle
+ * mode.
+ */
+@Internal
+public class EndOfSegmentEvent extends RuntimeEvent {
+
+    /** The singleton instance of this event. */
+    public static final EndOfSegmentEvent INSTANCE = new EndOfSegmentEvent();
+
+    @Override
+    public void write(DataOutputView out) throws IOException {
+        throw new UnsupportedOperationException("This method should never be called");
+    }
+
+    @Override
+    public void read(DataInputView in) throws IOException {
+        throw new UnsupportedOperationException("This method should never be called");
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    public int hashCode() {

Review Comment:
   Fixed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyServiceWriter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.impl.TieredStorageNettyServiceImpl;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+
+/** {@link NettyServiceWriter} is used to write buffers to {@link TieredStorageNettyServiceImpl}. */
+public interface NettyServiceWriter {
+    /**
+     * Write a buffer.
+     *
+     * @param bufferContext buffer context represents the buffer.
+     */
+    void writeBuffer(BufferContext bufferContext);
+
+    /**
+     * Get the number of existed buffers in the writer.
+     *
+     * @return the buffer number.
+     */
+    int size();
+
+    /** Clear and recycle all existed buffers in the writer. */

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22342: Upstream supports reading buffers from tiered store

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22342:
URL: https://github.com/apache/flink/pull/22342#issuecomment-1495623422

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d283560c6e4ed1b46db84c7374cc7978bebe37dd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d283560c6e4ed1b46db84c7374cc7978bebe37dd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d283560c6e4ed1b46db84c7374cc7978bebe37dd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1184909916


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;
+
+    private int bufferIndex;
+
+    private int subpartitionId;
+
+    public BufferContext(@Nullable Buffer buffer, int bufferIndex, int subpartitionId) {
+        this.buffer = buffer;
+        this.bufferIndex = bufferIndex;
+        this.subpartitionId = subpartitionId;
+    }
+
+    public BufferContext(@Nullable Throwable throwable) {

Review Comment:
   Done.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/netty/NettyBufferQueue.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.netty;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1184909295


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate a error.
+ */
+public class BufferContext {
+
+    @Nullable private Buffer buffer;
+
+    @Nullable private Throwable throwable;
+
+    private int bufferIndex;
+
+    private int subpartitionId;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223761576


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {
+
+    // ------------------------------------
+    //          For producer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>>
+            registeredServiceProducers = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    // ------------------------------------
+    //          For consumer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, Supplier<InputChannel>>>
+            registeredInputChannelProviders = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<
+                            TieredStorageSubpartitionId,
+                            NettyConnectionReaderAvailabilityAndPriorityHelper>>
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers =
+                    new ConcurrentHashMap<>();
+
+    @Override
+    public void registerProducer(
+            TieredStoragePartitionId partitionId, NettyServiceProducer serviceProducer) {
+        List<NettyServiceProducer> serviceProducers =
+                registeredServiceProducers.getOrDefault(partitionId, new ArrayList<>());
+        serviceProducers.add(serviceProducer);
+        registeredServiceProducers.put(partitionId, serviceProducers);
+    }
+
+    @Override
+    public NettyConnectionReader registerConsumer(
+            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
+        Integer channelIndex = registeredChannelIndexes.get(partitionId).remove(subpartitionId);
+        if (registeredChannelIndexes.get(partitionId).isEmpty()) {
+            registeredChannelIndexes.remove(partitionId);
+        }
+
+        Supplier<InputChannel> inputChannelProvider =
+                registeredInputChannelProviders.get(partitionId).remove(subpartitionId);
+        if (registeredInputChannelProviders.get(partitionId).isEmpty()) {
+            registeredInputChannelProviders.remove(partitionId);
+        }
+
+        NettyConnectionReaderAvailabilityAndPriorityHelper helper =
+                registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+                        .get(partitionId)
+                        .remove(subpartitionId);
+        if (registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+                .get(partitionId)
+                .isEmpty()) {
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers.remove(partitionId);
+        }
+        return new NettyConnectionReaderImpl(channelIndex, inputChannelProvider, helper);
+    }
+
+    /**
+     * Create a {@link ResultSubpartitionView} for the netty server.
+     *
+     * @param partitionId partition id indicates the unique id of {@link TieredResultPartition}.
+     * @param subpartitionId subpartition id indicates the unique id of subpartition.
+     * @param availabilityListener listener is used to listen the available status of data.
+     * @return the {@link TieredStoreResultSubpartitionView}.
+     */
+    public ResultSubpartitionView createResultSubpartitionView(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            BufferAvailabilityListener availabilityListener) {
+        List<NettyServiceProducer> serviceProducers = registeredServiceProducers.get(partitionId);
+        if (serviceProducers == null) {
+            return new TieredStoreResultSubpartitionView(
+                    availabilityListener, new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+        }
+        List<Queue<NettyPayload>> queues = new ArrayList<>();
+        List<NettyConnectionId> nettyConnectionIds = new ArrayList<>();
+        for (NettyServiceProducer serviceProducer : serviceProducers) {
+            LinkedBlockingQueue<NettyPayload> queue = new LinkedBlockingQueue<>();
+            NettyConnectionWriterImpl writer = new NettyConnectionWriterImpl(queue);
+            serviceProducer.connectionEstablished(subpartitionId, writer);
+            nettyConnectionIds.add(writer.getNettyConnectionId());
+            queues.add(queue);
+            registeredAvailabilityListeners.put(
+                    writer.getNettyConnectionId(), availabilityListener);
+        }
+        return new TieredStoreResultSubpartitionView(
+                availabilityListener,
+                queues,
+                nettyConnectionIds,
+                registeredServiceProducers.get(partitionId));
+    }
+
+    /**
+     * Notify the {@link ResultSubpartitionView} to send buffer.
+     *
+     * @param connectionId connection id indicates the id of connection.
+     */
+    public void notifyResultSubpartitionViewSendBuffer(NettyConnectionId connectionId) {
+        BufferAvailabilityListener listener = registeredAvailabilityListeners.get(connectionId);
+        if (listener != null) {
+            listener.notifyDataAvailable();
+        }
+    }
+
+    /**
+     * Set up input channels in {@link SingleInputGate}.
+     *
+     * @param partitionIds partition ids indicates the ids of {@link TieredResultPartition}.
+     * @param subpartitionIds subpartition ids indicates the ids of subpartition.
+     */
+    public void setUpInputChannels(
+            TieredStoragePartitionId[] partitionIds,
+            TieredStorageSubpartitionId[] subpartitionIds,
+            List<Supplier<InputChannel>> inputChannelProviders,
+            NettyConnectionReaderAvailabilityAndPriorityHelper helper) {
+        checkState(partitionIds.length == subpartitionIds.length);
+        checkState(subpartitionIds.length == inputChannelProviders.size());
+        for (int index = 0; index < partitionIds.length; ++index) {
+            TieredStoragePartitionId partitionId = partitionIds[index];
+            TieredStorageSubpartitionId subpartitionId = subpartitionIds[index];
+
+            Map<TieredStorageSubpartitionId, Integer> channelIndexes =
+                    registeredChannelIndexes.getOrDefault(partitionId, new ConcurrentHashMap<>());

Review Comment:
   Maybe we can simplify it as `registeredChannelIndexes.computeIfAbsent().put()` mode.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {
+
+    // ------------------------------------
+    //          For producer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>>
+            registeredServiceProducers = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    // ------------------------------------
+    //          For consumer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, Supplier<InputChannel>>>
+            registeredInputChannelProviders = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<
+                            TieredStorageSubpartitionId,
+                            NettyConnectionReaderAvailabilityAndPriorityHelper>>
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers =
+                    new ConcurrentHashMap<>();
+
+    @Override
+    public void registerProducer(
+            TieredStoragePartitionId partitionId, NettyServiceProducer serviceProducer) {
+        List<NettyServiceProducer> serviceProducers =
+                registeredServiceProducers.getOrDefault(partitionId, new ArrayList<>());
+        serviceProducers.add(serviceProducer);
+        registeredServiceProducers.put(partitionId, serviceProducers);
+    }
+
+    @Override
+    public NettyConnectionReader registerConsumer(
+            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
+        Integer channelIndex = registeredChannelIndexes.get(partitionId).remove(subpartitionId);
+        if (registeredChannelIndexes.get(partitionId).isEmpty()) {
+            registeredChannelIndexes.remove(partitionId);
+        }
+
+        Supplier<InputChannel> inputChannelProvider =
+                registeredInputChannelProviders.get(partitionId).remove(subpartitionId);
+        if (registeredInputChannelProviders.get(partitionId).isEmpty()) {
+            registeredInputChannelProviders.remove(partitionId);
+        }
+
+        NettyConnectionReaderAvailabilityAndPriorityHelper helper =
+                registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+                        .get(partitionId)
+                        .remove(subpartitionId);
+        if (registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+                .get(partitionId)
+                .isEmpty()) {
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers.remove(partitionId);
+        }
+        return new NettyConnectionReaderImpl(channelIndex, inputChannelProvider, helper);
+    }
+
+    /**
+     * Create a {@link ResultSubpartitionView} for the netty server.
+     *
+     * @param partitionId partition id indicates the unique id of {@link TieredResultPartition}.
+     * @param subpartitionId subpartition id indicates the unique id of subpartition.
+     * @param availabilityListener listener is used to listen the available status of data.
+     * @return the {@link TieredStoreResultSubpartitionView}.
+     */
+    public ResultSubpartitionView createResultSubpartitionView(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            BufferAvailabilityListener availabilityListener) {
+        List<NettyServiceProducer> serviceProducers = registeredServiceProducers.get(partitionId);
+        if (serviceProducers == null) {
+            return new TieredStoreResultSubpartitionView(
+                    availabilityListener, new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+        }
+        List<Queue<NettyPayload>> queues = new ArrayList<>();
+        List<NettyConnectionId> nettyConnectionIds = new ArrayList<>();
+        for (NettyServiceProducer serviceProducer : serviceProducers) {
+            LinkedBlockingQueue<NettyPayload> queue = new LinkedBlockingQueue<>();
+            NettyConnectionWriterImpl writer = new NettyConnectionWriterImpl(queue);
+            serviceProducer.connectionEstablished(subpartitionId, writer);
+            nettyConnectionIds.add(writer.getNettyConnectionId());
+            queues.add(queue);
+            registeredAvailabilityListeners.put(
+                    writer.getNettyConnectionId(), availabilityListener);
+        }
+        return new TieredStoreResultSubpartitionView(
+                availabilityListener,
+                queues,
+                nettyConnectionIds,
+                registeredServiceProducers.get(partitionId));
+    }
+
+    /**
+     * Notify the {@link ResultSubpartitionView} to send buffer.
+     *
+     * @param connectionId connection id indicates the id of connection.
+     */
+    public void notifyResultSubpartitionViewSendBuffer(NettyConnectionId connectionId) {
+        BufferAvailabilityListener listener = registeredAvailabilityListeners.get(connectionId);
+        if (listener != null) {
+            listener.notifyDataAvailable();
+        }
+    }
+
+    /**
+     * Set up input channels in {@link SingleInputGate}.
+     *
+     * @param partitionIds partition ids indicates the ids of {@link TieredResultPartition}.
+     * @param subpartitionIds subpartition ids indicates the ids of subpartition.
+     */
+    public void setUpInputChannels(

Review Comment:
   ```suggestion
       public void setupInputChannels(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+    private final BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+            connectionEstablishConsumer;
+
+    private final Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+    public TestingNettyServiceProducer(

Review Comment:
   We'd better make this private to avoid using the constructor directly. It is necessary to use Builder to create the test object.
   
   There are many similar cases, we should also change them.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStoreResultSubpartitionView}. */
+public class TieredStoreResultSubpartitionViewTest {
+
+    private static final int TIER_NUMBER = 2;
+
+    private CompletableFuture<Void> availabilityListener;
+
+    private List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
+
+    private TieredStoreResultSubpartitionView tieredStoreResultSubpartitionView;
+
+    @BeforeEach
+    void before() {
+        availabilityListener = new CompletableFuture<>();
+        nettyPayloadQueues = createNettyPayloadQueues();
+        connectionBrokenConsumers =
+                Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener).isDone();
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        assertThat(tieredStoreResultSubpartitionView.getNextBuffer()).isNull();
+    }
+
+    @Test
+    void testGetNextBufferFailed() {
+        Throwable expectedError = new IOException();
+        nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError);
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+        assertThatThrownBy(tieredStoreResultSubpartitionView::getNextBuffer)
+                .hasCause(expectedError);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() {
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(2);
+        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+    }
+
+    @Test
+    void testNotifyRequiredSegmentId() {
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener.isDone()).isTrue();
+    }
+
+    @Test
+    void testReleaseAllResources() throws IOException {
+        tieredStoreResultSubpartitionView.releaseAllResources();
+        assertThat(nettyPayloadQueues.get(0)).hasSize(0);
+        assertThat(nettyPayloadQueues.get(1)).hasSize(0);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+        assertThat(connectionBrokenConsumers.get(1).isDone()).isTrue();
+        assertThat(tieredStoreResultSubpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testGetNumberOfQueuedBuffers() {
+        assertThat(tieredStoreResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3);
+        assertThat(tieredStoreResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers())
+                .isEqualTo(3);
+    }
+
+    private void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) {
+        assertThat(bufferAndBacklog).isNotNull();
+        assertThat(bufferAndBacklog.buffer()).isNotNull();
+        assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(backlog);
+    }
+
+    private BufferAvailabilityListener createBufferAvailabilityListener(
+            CompletableFuture<Void> notifier) {
+        return () -> notifier.complete(null);
+    }
+
+    private List<Queue<NettyPayload>> createNettyPayloadQueues() {

Review Comment:
   ```suggestion
       private static List<Queue<NettyPayload>> createNettyPayloadQueues() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStoreResultSubpartitionView}. */
+public class TieredStoreResultSubpartitionViewTest {
+
+    private static final int TIER_NUMBER = 2;
+
+    private CompletableFuture<Void> availabilityListener;
+
+    private List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
+
+    private TieredStoreResultSubpartitionView tieredStoreResultSubpartitionView;
+
+    @BeforeEach
+    void before() {
+        availabilityListener = new CompletableFuture<>();
+        nettyPayloadQueues = createNettyPayloadQueues();
+        connectionBrokenConsumers =
+                Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener).isDone();
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        assertThat(tieredStoreResultSubpartitionView.getNextBuffer()).isNull();
+    }
+
+    @Test
+    void testGetNextBufferFailed() {
+        Throwable expectedError = new IOException();
+        nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError);
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+        assertThatThrownBy(tieredStoreResultSubpartitionView::getNextBuffer)
+                .hasCause(expectedError);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() {
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(2);
+        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+    }
+
+    @Test
+    void testNotifyRequiredSegmentId() {
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener.isDone()).isTrue();
+    }
+
+    @Test
+    void testReleaseAllResources() throws IOException {
+        tieredStoreResultSubpartitionView.releaseAllResources();
+        assertThat(nettyPayloadQueues.get(0)).hasSize(0);
+        assertThat(nettyPayloadQueues.get(1)).hasSize(0);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+        assertThat(connectionBrokenConsumers.get(1).isDone()).isTrue();
+        assertThat(tieredStoreResultSubpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testGetNumberOfQueuedBuffers() {
+        assertThat(tieredStoreResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3);
+        assertThat(tieredStoreResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers())
+                .isEqualTo(3);
+    }
+
+    private void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) {
+        assertThat(bufferAndBacklog).isNotNull();
+        assertThat(bufferAndBacklog.buffer()).isNotNull();
+        assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(backlog);
+    }
+
+    private BufferAvailabilityListener createBufferAvailabilityListener(
+            CompletableFuture<Void> notifier) {
+        return () -> notifier.complete(null);
+    }
+
+    private List<Queue<NettyPayload>> createNettyPayloadQueues() {
+        List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>();
+        for (int index = 0; index < TIER_NUMBER; ++index) {
+            Queue<NettyPayload> queue = new ArrayDeque<>();
+            queue.add(NettyPayload.newSegment(index));
+            queue.add(
+                    NettyPayload.newBuffer(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE),
+                            0,
+                            index));
+            queue.add(
+                    NettyPayload.newBuffer(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE,
+                                    END_OF_SEGMENT),
+                            1,
+                            index));
+            nettyPayloadQueues.add(queue);
+        }
+        return nettyPayloadQueues;
+    }
+
+    private List<Queue<NettyPayload>> createNettyPayloadQueuesWithError(Throwable error) {
+        List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>();
+        for (int index = 0; index < TIER_NUMBER; ++index) {
+            Queue<NettyPayload> queue = new ArrayDeque<>();
+            queue.add(NettyPayload.newSegment(index));
+            queue.add(NettyPayload.newError(error));
+            nettyPayloadQueues.add(queue);
+        }
+        return nettyPayloadQueues;
+    }
+
+    private List<NettyConnectionId> createNettyConnectionIds() {
+        List<NettyConnectionId> nettyConnectionIds = new ArrayList<>();
+        for (int index = 0; index < TIER_NUMBER; ++index) {
+            nettyConnectionIds.add(NettyConnectionId.newId());
+        }
+        return nettyConnectionIds;
+    }
+
+    private List<NettyServiceProducer> createNettyServiceProducers(

Review Comment:
   ```suggestion
       private static List<NettyServiceProducer> createNettyServiceProducers(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStoreResultSubpartitionView}. */
+public class TieredStoreResultSubpartitionViewTest {
+
+    private static final int TIER_NUMBER = 2;
+
+    private CompletableFuture<Void> availabilityListener;
+
+    private List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
+
+    private TieredStoreResultSubpartitionView tieredStoreResultSubpartitionView;
+
+    @BeforeEach
+    void before() {
+        availabilityListener = new CompletableFuture<>();
+        nettyPayloadQueues = createNettyPayloadQueues();
+        connectionBrokenConsumers =
+                Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener).isDone();
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        assertThat(tieredStoreResultSubpartitionView.getNextBuffer()).isNull();
+    }
+
+    @Test
+    void testGetNextBufferFailed() {
+        Throwable expectedError = new IOException();
+        nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError);
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+        assertThatThrownBy(tieredStoreResultSubpartitionView::getNextBuffer)
+                .hasCause(expectedError);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() {
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(2);
+        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+    }
+
+    @Test
+    void testNotifyRequiredSegmentId() {
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener.isDone()).isTrue();
+    }
+
+    @Test
+    void testReleaseAllResources() throws IOException {
+        tieredStoreResultSubpartitionView.releaseAllResources();
+        assertThat(nettyPayloadQueues.get(0)).hasSize(0);
+        assertThat(nettyPayloadQueues.get(1)).hasSize(0);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+        assertThat(connectionBrokenConsumers.get(1).isDone()).isTrue();
+        assertThat(tieredStoreResultSubpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testGetNumberOfQueuedBuffers() {
+        assertThat(tieredStoreResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3);
+        assertThat(tieredStoreResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers())
+                .isEqualTo(3);
+    }
+
+    private void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) {
+        assertThat(bufferAndBacklog).isNotNull();
+        assertThat(bufferAndBacklog.buffer()).isNotNull();
+        assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(backlog);
+    }
+
+    private BufferAvailabilityListener createBufferAvailabilityListener(

Review Comment:
   ```suggestion
       private static BufferAvailabilityListener createBufferAvailabilityListener(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterTest {
+
+    private static final int SUBPARTITION_ID = 0;
+
+    @Test
+    void testWriteBuffer() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        assertThat(nettyPayloadQueue.size()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testGetNettyConnectionId() {
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
+    }
+
+    @Test
+    void testNumQueuedBuffers() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(bufferNumber);
+    }
+
+    @Test
+    void testClose() {
+        int bufferNumber = 10;
+        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyConnectionWriter nettyConnectionWriter =
+                new NettyConnectionWriterImpl(nettyPayloadQueue);
+        writeBufferTpWriter(bufferNumber, nettyConnectionWriter);
+        nettyConnectionWriter.close();
+        assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(0);
+    }
+
+    private void writeBufferTpWriter(
+            int bufferNumber, NettyConnectionWriter nettyConnectionWriter) {
+        for (int index = 0; index < bufferNumber; ++index) {
+            nettyConnectionWriter.writeBuffer(
+                    NettyPayload.newBuffer(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE),

Review Comment:
   Note that to check all the recyclers like this.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStoreResultSubpartitionView}. */
+public class TieredStoreResultSubpartitionViewTest {
+
+    private static final int TIER_NUMBER = 2;
+
+    private CompletableFuture<Void> availabilityListener;
+
+    private List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
+
+    private TieredStoreResultSubpartitionView tieredStoreResultSubpartitionView;
+
+    @BeforeEach
+    void before() {
+        availabilityListener = new CompletableFuture<>();
+        nettyPayloadQueues = createNettyPayloadQueues();
+        connectionBrokenConsumers =
+                Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener).isDone();
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        assertThat(tieredStoreResultSubpartitionView.getNextBuffer()).isNull();
+    }
+
+    @Test
+    void testGetNextBufferFailed() {
+        Throwable expectedError = new IOException();
+        nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError);
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+        assertThatThrownBy(tieredStoreResultSubpartitionView::getNextBuffer)
+                .hasCause(expectedError);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() {
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(2);
+        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+    }
+
+    @Test
+    void testNotifyRequiredSegmentId() {
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener.isDone()).isTrue();
+    }
+
+    @Test
+    void testReleaseAllResources() throws IOException {
+        tieredStoreResultSubpartitionView.releaseAllResources();
+        assertThat(nettyPayloadQueues.get(0)).hasSize(0);
+        assertThat(nettyPayloadQueues.get(1)).hasSize(0);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+        assertThat(connectionBrokenConsumers.get(1).isDone()).isTrue();
+        assertThat(tieredStoreResultSubpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testGetNumberOfQueuedBuffers() {
+        assertThat(tieredStoreResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3);
+        assertThat(tieredStoreResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers())
+                .isEqualTo(3);
+    }
+
+    private void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) {

Review Comment:
   ```suggestion
       private static void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) {
   ```
   
   I checked all the private methods in the test classes, they should be static.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {
+
+    // ------------------------------------
+    //          For producer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>>
+            registeredServiceProducers = new ConcurrentHashMap<>();
+
+    private final Map<NettyConnectionId, BufferAvailabilityListener>
+            registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+    // ------------------------------------
+    //          For consumer side
+    // ------------------------------------
+
+    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Integer>>
+            registeredChannelIndexes = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<TieredStorageSubpartitionId, Supplier<InputChannel>>>
+            registeredInputChannelProviders = new ConcurrentHashMap<>();
+
+    private final Map<
+                    TieredStoragePartitionId,
+                    Map<
+                            TieredStorageSubpartitionId,
+                            NettyConnectionReaderAvailabilityAndPriorityHelper>>
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers =
+                    new ConcurrentHashMap<>();
+
+    @Override
+    public void registerProducer(
+            TieredStoragePartitionId partitionId, NettyServiceProducer serviceProducer) {
+        List<NettyServiceProducer> serviceProducers =
+                registeredServiceProducers.getOrDefault(partitionId, new ArrayList<>());
+        serviceProducers.add(serviceProducer);
+        registeredServiceProducers.put(partitionId, serviceProducers);
+    }
+
+    @Override
+    public NettyConnectionReader registerConsumer(
+            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
+        Integer channelIndex = registeredChannelIndexes.get(partitionId).remove(subpartitionId);
+        if (registeredChannelIndexes.get(partitionId).isEmpty()) {
+            registeredChannelIndexes.remove(partitionId);
+        }
+
+        Supplier<InputChannel> inputChannelProvider =
+                registeredInputChannelProviders.get(partitionId).remove(subpartitionId);
+        if (registeredInputChannelProviders.get(partitionId).isEmpty()) {
+            registeredInputChannelProviders.remove(partitionId);
+        }
+
+        NettyConnectionReaderAvailabilityAndPriorityHelper helper =
+                registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+                        .get(partitionId)
+                        .remove(subpartitionId);
+        if (registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+                .get(partitionId)
+                .isEmpty()) {
+            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers.remove(partitionId);
+        }
+        return new NettyConnectionReaderImpl(channelIndex, inputChannelProvider, helper);
+    }
+
+    /**
+     * Create a {@link ResultSubpartitionView} for the netty server.
+     *
+     * @param partitionId partition id indicates the unique id of {@link TieredResultPartition}.
+     * @param subpartitionId subpartition id indicates the unique id of subpartition.
+     * @param availabilityListener listener is used to listen the available status of data.
+     * @return the {@link TieredStoreResultSubpartitionView}.
+     */
+    public ResultSubpartitionView createResultSubpartitionView(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            BufferAvailabilityListener availabilityListener) {
+        List<NettyServiceProducer> serviceProducers = registeredServiceProducers.get(partitionId);
+        if (serviceProducers == null) {
+            return new TieredStoreResultSubpartitionView(
+                    availabilityListener, new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+        }
+        List<Queue<NettyPayload>> queues = new ArrayList<>();
+        List<NettyConnectionId> nettyConnectionIds = new ArrayList<>();
+        for (NettyServiceProducer serviceProducer : serviceProducers) {
+            LinkedBlockingQueue<NettyPayload> queue = new LinkedBlockingQueue<>();
+            NettyConnectionWriterImpl writer = new NettyConnectionWriterImpl(queue);
+            serviceProducer.connectionEstablished(subpartitionId, writer);
+            nettyConnectionIds.add(writer.getNettyConnectionId());
+            queues.add(queue);
+            registeredAvailabilityListeners.put(
+                    writer.getNettyConnectionId(), availabilityListener);
+        }
+        return new TieredStoreResultSubpartitionView(
+                availabilityListener,
+                queues,
+                nettyConnectionIds,
+                registeredServiceProducers.get(partitionId));
+    }
+
+    /**
+     * Notify the {@link ResultSubpartitionView} to send buffer.
+     *
+     * @param connectionId connection id indicates the id of connection.
+     */
+    public void notifyResultSubpartitionViewSendBuffer(NettyConnectionId connectionId) {
+        BufferAvailabilityListener listener = registeredAvailabilityListeners.get(connectionId);
+        if (listener != null) {
+            listener.notifyDataAvailable();
+        }
+    }
+
+    /**
+     * Set up input channels in {@link SingleInputGate}.
+     *
+     * @param partitionIds partition ids indicates the ids of {@link TieredResultPartition}.
+     * @param subpartitionIds subpartition ids indicates the ids of subpartition.
+     */
+    public void setUpInputChannels(
+            TieredStoragePartitionId[] partitionIds,
+            TieredStorageSubpartitionId[] subpartitionIds,
+            List<Supplier<InputChannel>> inputChannelProviders,
+            NettyConnectionReaderAvailabilityAndPriorityHelper helper) {
+        checkState(partitionIds.length == subpartitionIds.length);
+        checkState(subpartitionIds.length == inputChannelProviders.size());
+        for (int index = 0; index < partitionIds.length; ++index) {
+            TieredStoragePartitionId partitionId = partitionIds[index];
+            TieredStorageSubpartitionId subpartitionId = subpartitionIds[index];
+
+            Map<TieredStorageSubpartitionId, Integer> channelIndexes =
+                    registeredChannelIndexes.getOrDefault(partitionId, new ConcurrentHashMap<>());
+            channelIndexes.put(subpartitionId, index);
+            registeredChannelIndexes.put(partitionId, channelIndexes);
+
+            Map<TieredStorageSubpartitionId, Supplier<InputChannel>> providers =
+                    registeredInputChannelProviders.getOrDefault(
+                            partitionId, new ConcurrentHashMap<>());
+            providers.put(subpartitionId, inputChannelProviders.get(index));
+            registeredInputChannelProviders.put(partitionId, providers);
+
+            Map<TieredStorageSubpartitionId, NettyConnectionReaderAvailabilityAndPriorityHelper>
+                    helpers =
+                            registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+                                    .getOrDefault(partitionId, new ConcurrentHashMap<>());

Review Comment:
   ditto



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStoreResultSubpartitionView}. */
+public class TieredStoreResultSubpartitionViewTest {
+
+    private static final int TIER_NUMBER = 2;
+
+    private CompletableFuture<Void> availabilityListener;
+
+    private List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
+
+    private TieredStoreResultSubpartitionView tieredStoreResultSubpartitionView;
+
+    @BeforeEach
+    void before() {
+        availabilityListener = new CompletableFuture<>();
+        nettyPayloadQueues = createNettyPayloadQueues();
+        connectionBrokenConsumers =
+                Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener).isDone();
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        assertThat(tieredStoreResultSubpartitionView.getNextBuffer()).isNull();
+    }
+
+    @Test
+    void testGetNextBufferFailed() {
+        Throwable expectedError = new IOException();
+        nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError);
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+        assertThatThrownBy(tieredStoreResultSubpartitionView::getNextBuffer)
+                .hasCause(expectedError);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() {
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(2);
+        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+    }
+
+    @Test
+    void testNotifyRequiredSegmentId() {
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener.isDone()).isTrue();
+    }
+
+    @Test
+    void testReleaseAllResources() throws IOException {
+        tieredStoreResultSubpartitionView.releaseAllResources();
+        assertThat(nettyPayloadQueues.get(0)).hasSize(0);
+        assertThat(nettyPayloadQueues.get(1)).hasSize(0);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+        assertThat(connectionBrokenConsumers.get(1).isDone()).isTrue();
+        assertThat(tieredStoreResultSubpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testGetNumberOfQueuedBuffers() {
+        assertThat(tieredStoreResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3);
+        assertThat(tieredStoreResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers())
+                .isEqualTo(3);
+    }
+
+    private void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) {
+        assertThat(bufferAndBacklog).isNotNull();
+        assertThat(bufferAndBacklog.buffer()).isNotNull();
+        assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(backlog);
+    }
+
+    private BufferAvailabilityListener createBufferAvailabilityListener(
+            CompletableFuture<Void> notifier) {
+        return () -> notifier.complete(null);
+    }
+
+    private List<Queue<NettyPayload>> createNettyPayloadQueues() {
+        List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>();
+        for (int index = 0; index < TIER_NUMBER; ++index) {
+            Queue<NettyPayload> queue = new ArrayDeque<>();
+            queue.add(NettyPayload.newSegment(index));
+            queue.add(
+                    NettyPayload.newBuffer(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE),
+                            0,
+                            index));
+            queue.add(
+                    NettyPayload.newBuffer(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE,
+                                    END_OF_SEGMENT),
+                            1,
+                            index));
+            nettyPayloadQueues.add(queue);
+        }
+        return nettyPayloadQueues;
+    }
+
+    private List<Queue<NettyPayload>> createNettyPayloadQueuesWithError(Throwable error) {
+        List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>();
+        for (int index = 0; index < TIER_NUMBER; ++index) {
+            Queue<NettyPayload> queue = new ArrayDeque<>();
+            queue.add(NettyPayload.newSegment(index));
+            queue.add(NettyPayload.newError(error));
+            nettyPayloadQueues.add(queue);
+        }
+        return nettyPayloadQueues;
+    }
+
+    private List<NettyConnectionId> createNettyConnectionIds() {

Review Comment:
   ```suggestion
       private static List<NettyConnectionId> createNettyConnectionIds() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoreResultSubpartitionViewTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStoreResultSubpartitionView}. */
+public class TieredStoreResultSubpartitionViewTest {
+
+    private static final int TIER_NUMBER = 2;
+
+    private CompletableFuture<Void> availabilityListener;
+
+    private List<Queue<NettyPayload>> nettyPayloadQueues;
+
+    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
+
+    private TieredStoreResultSubpartitionView tieredStoreResultSubpartitionView;
+
+    @BeforeEach
+    void before() {
+        availabilityListener = new CompletableFuture<>();
+        nettyPayloadQueues = createNettyPayloadQueues();
+        connectionBrokenConsumers =
+                Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+    }
+
+    @Test
+    void testGetNextBuffer() throws IOException {
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener).isDone();
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 1);
+        checkBufferAndBacklog(tieredStoreResultSubpartitionView.getNextBuffer(), 0);
+        assertThat(tieredStoreResultSubpartitionView.getNextBuffer()).isNull();
+    }
+
+    @Test
+    void testGetNextBufferFailed() {
+        Throwable expectedError = new IOException();
+        nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError);
+        tieredStoreResultSubpartitionView =
+                new TieredStoreResultSubpartitionView(
+                        createBufferAvailabilityListener(availabilityListener),
+                        nettyPayloadQueues,
+                        createNettyConnectionIds(),
+                        createNettyServiceProducers(connectionBrokenConsumers));
+        assertThatThrownBy(tieredStoreResultSubpartitionView::getNextBuffer)
+                .hasCause(expectedError);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+    }
+
+    @Test
+    void testGetAvailabilityAndBacklog() {
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(0);
+        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
+        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 =
+                tieredStoreResultSubpartitionView.getAvailabilityAndBacklog(2);
+        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
+    }
+
+    @Test
+    void testNotifyRequiredSegmentId() {
+        tieredStoreResultSubpartitionView.notifyRequiredSegmentId(1);
+        assertThat(availabilityListener.isDone()).isTrue();
+    }
+
+    @Test
+    void testReleaseAllResources() throws IOException {
+        tieredStoreResultSubpartitionView.releaseAllResources();
+        assertThat(nettyPayloadQueues.get(0)).hasSize(0);
+        assertThat(nettyPayloadQueues.get(1)).hasSize(0);
+        assertThat(connectionBrokenConsumers.get(0).isDone()).isTrue();
+        assertThat(connectionBrokenConsumers.get(1).isDone()).isTrue();
+        assertThat(tieredStoreResultSubpartitionView.isReleased()).isTrue();
+    }
+
+    @Test
+    void testGetNumberOfQueuedBuffers() {
+        assertThat(tieredStoreResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3);
+        assertThat(tieredStoreResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers())
+                .isEqualTo(3);
+    }
+
+    private void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) {
+        assertThat(bufferAndBacklog).isNotNull();
+        assertThat(bufferAndBacklog.buffer()).isNotNull();
+        assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(backlog);
+    }
+
+    private BufferAvailabilityListener createBufferAvailabilityListener(
+            CompletableFuture<Void> notifier) {
+        return () -> notifier.complete(null);
+    }
+
+    private List<Queue<NettyPayload>> createNettyPayloadQueues() {
+        List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>();
+        for (int index = 0; index < TIER_NUMBER; ++index) {
+            Queue<NettyPayload> queue = new ArrayDeque<>();
+            queue.add(NettyPayload.newSegment(index));
+            queue.add(
+                    NettyPayload.newBuffer(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE),
+                            0,
+                            index));
+            queue.add(
+                    NettyPayload.newBuffer(
+                            new NetworkBuffer(
+                                    MemorySegmentFactory.allocateUnpooledSegment(0),
+                                    BufferRecycler.DummyBufferRecycler.INSTANCE,
+                                    END_OF_SEGMENT),
+                            1,
+                            index));
+            nettyPayloadQueues.add(queue);
+        }
+        return nettyPayloadQueues;
+    }
+
+    private List<Queue<NettyPayload>> createNettyPayloadQueuesWithError(Throwable error) {

Review Comment:
   ```suggestion
       private static List<Queue<NettyPayload>> createNettyPayloadQueuesWithError(Throwable error) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa merged pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa merged PR #22342:
URL: https://github.com/apache/flink/pull/22342


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1209305502


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in Tiered Store shuffle

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1212512688


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in tiered storage shuffle
+ * mode.
+ */
+@Internal
+public class EndOfSegmentEvent extends RuntimeEvent {
+
+    /** The singleton instance of this event. */
+    public static final EndOfSegmentEvent INSTANCE = new EndOfSegmentEvent();
+
+    @Override
+    public void write(DataOutputView out) throws IOException {
+        throw new UnsupportedOperationException("This method should never be called");
+    }
+
+    @Override
+    public void read(DataInputView in) throws IOException {
+        throw new UnsupportedOperationException("This method should never be called");
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    public int hashCode() {

Review Comment:
   This hash code 1965146673 is a duplicated value with `EndOfPartitionEvent`, which may cause some strange errors when used. Please change a new random value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1218798506


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStoragePartitionIdAndSubpartitionId.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+/** The combination of {@link TieredStoragePartitionId} and {@link TieredStorageSubpartitionId}. */
+public class TieredStoragePartitionIdAndSubpartitionId {

Review Comment:
   1. This should belong to the `common` package
   2. Might make sense to implement `TieredStorageDataIdentifier`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer.

Review Comment:
   ```suggestion
        * Write a buffer to the netty connection.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer.
+     *
+     * @param bufferContext buffer context represents the buffer.
+     */
+    void writeBuffer(BufferContext bufferContext);
+
+    /**
+     * Get the number of existed buffers in the writer.
+     *
+     * @return the buffer number.
+     */
+    int size();

Review Comment:
   ```suggestion
       int numQueuedBuffers();
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate an error or a segment id.
+ */
+public class BufferContext {

Review Comment:
   This is not a tiered storage concept. Maybe it belongs to the netty package.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate an error or a segment id.
+ */
+public class BufferContext {
+
+    private Buffer buffer;
+
+    private Throwable error;
+
+    private int bufferIndex;
+
+    private int subpartitionId;
+
+    private int segmentId = -1;

Review Comment:
   1. Need javadoc.
   2. Should these be final?
   3. Some of the fields should be `@Nullable`, and should explain the semantic of null values.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer.
+     *
+     * @param bufferContext buffer context represents the buffer.
+     */
+    void writeBuffer(BufferContext bufferContext);
+
+    /**
+     * Get the number of existed buffers in the writer.
+     *
+     * @return the buffer number.
+     */
+    int size();

Review Comment:
   We should explain in the class JavaDoc that the buffers are firstly queued locally before sent to the receiver.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/** {@link NettyConnectionWriter} is used by {@link TierProducerAgent} to write buffer to netty. */
+public interface NettyConnectionWriter {
+    /**
+     * Write a buffer.
+     *
+     * @param bufferContext buffer context represents the buffer.
+     */
+    void writeBuffer(BufferContext bufferContext);
+
+    /**
+     * Get the number of existed buffers in the writer.
+     *
+     * @return the buffer number.
+     */
+    int size();
+
+    /** Close the writer and recycle all buffers. */

Review Comment:
   ```suggestion
       /** Close the connection and release all resources. */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReader.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+
+import java.util.Optional;
+
+/** {@link NettyConnectionReader} is used by {@link TierConsumerAgent} to read buffer from netty. */
+public interface NettyConnectionReader {
+    /**
+     * Read a buffer.
+     *
+     * @param segmentId segment id indicates the id of segment.
+     * @return a buffer.
+     */
+    Optional<Buffer> readBuffer(int segmentId);

Review Comment:
   What does it mean if an empty optional is returned?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/impl/NettyConnectionReaderImpl.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.impl;

Review Comment:
   The `impl` package might not be necessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/impl/NettyConnectionReaderImpl.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.impl;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionReader;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** The default implementation of {@link NettyConnectionReader}. */
+public class NettyConnectionReaderImpl implements NettyConnectionReader {
+
+    private final int subpartitionId;
+    private final InputChannel[] inputChannels;
+    private final BiConsumer<Integer, Boolean> queueChannelCallback;
+    private final int[] lastPrioritySequenceNumber;
+    private int lastRequiredSegmentId = 0;

Review Comment:
   We need to explain what are these things?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java:
##########
@@ -63,6 +67,8 @@ public class TieredResultPartition extends ResultPartition {
 
     private final TieredStorageResourceRegistry tieredStorageResourceRegistry;
 
+    private final TieredStorageNettyService nettyService;

Review Comment:
   Let's change this to `TieredStorageNettyServiceImpl` to make it explicit that the result partition sees the actual netty service implementation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -311,6 +311,12 @@ public long unsynchronizedGetSizeOfQueuedBuffers() {
         return 0;
     }
 
+    // ------------------------------------------------------------------------
+    //  For tiered storage
+    // ------------------------------------------------------------------------
+
+    public void notifyRequiredSegmentId(int segmentId) {}

Review Comment:
   What is the semantic here? Who should implement it? If this will be completed in future, we should add a `TODO` mark. Or if this is meant to be overridden, we should explain it in comment.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferContext.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link BufferContext} represents a combination of buffer, buffer index, and its subpartition
+ * id, and it could also indicate an error or a segment id.
+ */
+public class BufferContext {

Review Comment:
   Maybe rename it to something like `NettyPayload`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "WencongLiu (via GitHub)" <gi...@apache.org>.
WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223768558


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+    private static final int INPUT_CHANNEL_INDEX = 0;
+
+    @Test
+    void testReadBufferOfNonPriorityDataType() {
+        int bufferNumber = 1;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isTrue();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
+        int bufferNumber = 2;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, true, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isTrue();
+        assertThat(buffer.get().isBuffer()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        Tuple2<Integer, Boolean> result1 = availableAndPriorityConsumer.get();
+        assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result1.f1).isEqualTo(true);
+        Tuple2<Integer, Integer> result2 = prioritySequenceNumberConsumer.get();
+        assertThat(result2.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+        assertThat(result2.f1).isEqualTo(0);
+    }
+
+    @Test
+    void testReadEmptyBuffer() {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        (inputChannelIndex, priority) ->
+                                availableAndPriorityConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, priority)),
+                        (inputChannelIndex, sequenceNumber) ->
+                                prioritySequenceNumberConsumer.complete(
+                                        Tuple2.of(inputChannelIndex, sequenceNumber)));
+        Optional<Buffer> buffer = reader.readBuffer(0);
+        assertThat(buffer.isPresent()).isFalse();
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+        assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+    }
+
+    @Test
+    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
+        int bufferNumber = 0;
+        CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer =
+                new CompletableFuture<>();
+        CompletableFuture<Integer> requiredSegmentIdFuture = new CompletableFuture<>();
+        Supplier<InputChannel> inputChannelSupplier =
+                createInputChannelSupplier(bufferNumber, false, requiredSegmentIdFuture);
+        NettyConnectionReader reader =
+                createNettyConnectionReader(
+                        inputChannelSupplier,
+                        createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+                        createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+        reader.readBuffer(0);
+        assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+        reader.readBuffer(1);
+        assertThat(requiredSegmentIdFuture.get()).isEqualTo(1);
+    }
+
+    private BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(
+            CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer) {
+        return (inputChannelIndex, priority) ->
+                availableAndPriorityConsumer.complete(Tuple2.of(inputChannelIndex, priority));
+    }
+
+    private BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(
+            CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer) {
+        return (inputChannelIndex, sequenceNumber) ->
+                prioritySequenceNumberConsumer.complete(
+                        Tuple2.of(inputChannelIndex, sequenceNumber));
+    }
+
+    private Supplier<InputChannel> createInputChannelSupplier(
+            int bufferNumber,
+            boolean priority,
+            CompletableFuture<Integer> requiredSegmentIdFuture) {
+        TestInputChannel inputChannel =
+                new TestInputChannel(
+                        new SingleInputGateBuilder().build(),
+                        INPUT_CHANNEL_INDEX,
+                        requiredSegmentIdFuture);
+        try {
+            if (bufferNumber == 1) {

Review Comment:
   Yes. If there is one NetworkBuffer, the next datatype of it in TestInputChannel won't be NONE, which is not expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223820584


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+    private final BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+            connectionEstablishedConsumer;
+
+    private final Consumer<NettyConnectionId> connectionBrokenConsumer;
+
+    private TestingNettyServiceProducer(
+            BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>
+                    connectionEstablishedConsumer,
+            Consumer<NettyConnectionId> connectionBrokenConsumer) {
+        this.connectionEstablishedConsumer = connectionEstablishedConsumer;
+        this.connectionBrokenConsumer = connectionBrokenConsumer;
+    }
+
+    @Override
+    public void connectionEstablished(
+            TieredStorageSubpartitionId subpartitionId,
+            NettyConnectionWriter nettyConnectionWriter) {
+        connectionEstablishedConsumer.accept(subpartitionId, nettyConnectionWriter);
+    }
+
+    @Override
+    public void connectionBroken(NettyConnectionId connectionId) {
+        connectionBrokenConsumer.accept(connectionId);
+    }
+
+    /** Builder for {@link TestingNettyServiceProducer}. */
+    public static class Builder {
+
+        private BiConsumer<TieredStorageSubpartitionId, NettyConnectionWriter>

Review Comment:
   Please give a default implement, then we can simplify the code when using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org