You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xintongsong (via GitHub)" <gi...@apache.org> on 2023/04/06 02:53:27 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1159213039


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {

Review Comment:
   Why do we need this interface? Or asked differently, why do we need `TierType` and `SupportedTierCombinations` implementing the same interface?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;

Review Comment:
   1. I'd suggest `o.a.f.r.i.n.partition.hybrid.tiered`.
   2. Why does this belong to `upstream`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    /**
+     * Currently, only these tier combinations are supported. If the configured tiers is contained
+     * in the following combinations, an exception will be thrown.
+     */
+    enum SupportedTierCombinations implements TieredStoreMode {
+        MEMORY,
+        MEMORY_DISK,
+        MEMORY_DISK_REMOTE,
+    }

Review Comment:
   Why are we limiting the supported combinations at all?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.

Review Comment:
   That means `IN_CACHE` is different from the other three. Then why are they in the same enum?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,

Review Comment:
   JavaDoc is required for each enum value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;

Review Comment:
   JavaDoc is required for all methods in an interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    /**
+     * Currently, only these tier combinations are supported. If the configured tiers is contained
+     * in the following combinations, an exception will be thrown.
+     */
+    enum SupportedTierCombinations implements TieredStoreMode {
+        MEMORY,
+        MEMORY_DISK,
+        MEMORY_DISK_REMOTE,

Review Comment:
   JavaDoc is required for each enum value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;

Review Comment:
   This is not a common interface for all tiers, thus does not belong to this interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);

Review Comment:
   Use tiered store language. `subpartitionId` can be replaced by `producerId`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);

Review Comment:
   Why is this method in this interface? Shouldn't it belong to something like a consumer client?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);
+
+    void close();
+
+    void release();

Review Comment:
   JavaDoc is required for all methods in an interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/MemorySegmentAndChannel.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** {@link MemorySegment} info and the corresponding channel index. */
+public class MemorySegmentAndChannel {

Review Comment:
   This seems belong to the wrong commit? All its related changes are in the other commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);

Review Comment:
   Why do we need this for the tiers?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/cache/BufferAccumulatorImpl.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.MemorySegmentAndChannel;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * The implementation of the {@link BufferAccumulator}. The {@link BufferAccumulator} receives the
+ * records from {@link TieredStoreProducer} and the records will accumulate and transform to
+ * finished {@link * MemorySegment}s. The finished memory segments will be transferred to the
+ * corresponding tier dynamically.
+ */
+public class BufferAccumulatorImpl implements BufferAccumulator {

Review Comment:
   Why are we introducing the implementation now if it doesn't provide any actual implementation?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreConfiguration.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The configuration for tiered store. */
+public class TieredStoreConfiguration {
+
+    private final String tieredStoreTiers;

Review Comment:
   This should be an explicit type.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreResultPartition.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link TieredStoreResultPartition} appends records and events to the tiered store, which supports
+ * the upstream dynamically switches storage tier for writing shuffle data, and the downstream will
+ * read data from the relevant storage tier.
+ */
+public class TieredStoreResultPartition extends ResultPartition implements ChannelStateHolder {

Review Comment:
   Why implementing `ChannelStateHolder`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a common entrypoint of emitting records to tiered store. These records will be emitted to
+ * the {@link BufferAccumulator} to accumulate and transform into finished buffers.
+ */
+public class TieredStoreProducerImpl implements TieredStoreProducer {
+
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    public TieredStoreProducerImpl(
+            StorageTier[] storageTiers,
+            int numSubpartitions,
+            int bufferSize,
+            boolean isBroadcastOnly,
+            @Nullable BufferCompressor bufferCompressor) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+
+        this.bufferAccumulator = new BufferAccumulatorImpl();

Review Comment:
   Dependency should be injected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org