You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "TanYuxin-tyx (via GitHub)" <gi...@apache.org> on 2023/04/03 10:36:32 UTC

[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

TanYuxin-tyx opened a new pull request, #22330:
URL: https://github.com/apache/flink/pull/22330

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Support writing records to the new tiered store architecture. 
   
   The PR introduces a new `TieredStoreResultPartition` and the tiered store APIs of the producer-side, including `TieredStoreProducer`, `BufferAccumulator`, `StorageTier`, `TierWriter`. 
   
   `TierReaderView` is empty because it is referenced by `StorageTier` and the specific methods will be introduced in the subsequential Jira.
   
   `BufferAccumulatorImpl` is a default implementation, and the real implementation will be introduced in the subsequential Jira.
   
   
   ## Brief change log
   
     - *Introduce `TieredStoreMode`, the enum of the tier types.*
     - *Introduces a new `TieredStoreResultPartition`*
     - *Introduces the tiered store APIs of the producer-side, including `TieredStoreProducer`, `BufferAccumulator`, `StorageTier`, `TierWriter`*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - *Added test of `TieredStoreResultPartitionTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180104030


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }
+
+    public static TierType[] memoryDiskTierTypes() {

Review Comment:
   Removed it.
   In the first version, only the default tier type is supported, so I removed `memoryDiskTierTypes `.
   
   Currently, the default value is a fixed value. So I changed `TieredStorageConfiguration#getDefaultTierTypes` into a private method, and get the tierTypes when building the configuration.
   
   There is a TODO here, the `getDefaultTierTypes` should add a new argument in the future. And this method will be like this after adding the option of  remote storage path.
   ```
       private static TierType[] getDefaultTierTypes(String remoteStorageHomePath) {
           return remoteStorageHomePath == null
                   ? DEFAULT_MEMORY_DISK_TIER_TYPES
                   : DEFAULT_MEMORY_DISK_REMOTE_TIER_TYPES;
       }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122114


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, TieredStoragePartitionId partitionId) {
+        jobPartitionIds.computeIfAbsent(jobId, ignore -> new ArrayList<>()).add(partitionId);
+        resourceRegistry.registerResource(
+                partitionId, () -> tiers.forEach(TierMasterAgent::release));

Review Comment:
   These methods have been renamed into `addPartition` and `releasePartition`.
   
   Each tier itself will decide what to do, i.e., register or release resources. (These codes are not added now, because they will be added when implementing each tier.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161501622


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreConfiguration.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The configuration for tiered store. */
+public class TieredStoreConfiguration {
+
+    private final String tieredStoreTiers;

Review Comment:
   This change should be contained in the PR. I have removed it. I will use an explicit type when submitting the new PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498489


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    /**
+     * Currently, only these tier combinations are supported. If the configured tiers is contained
+     * in the following combinations, an exception will be thrown.
+     */
+    enum SupportedTierCombinations implements TieredStoreMode {
+        MEMORY,
+        MEMORY_DISK,
+        MEMORY_DISK_REMOTE,
+    }

Review Comment:
   `SupportedTierCombinations ` is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161497423


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;

Review Comment:
   I have moved these classes to `o.a.f.r.i.n.partition.hybrid.tiered`.
   The `TieredStoreMode` is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180093470


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+    private static final char[] HEX_CHARS = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
+    };
+
+    public static byte[] randomBytes(int length) {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187052528


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java:
##########
@@ -23,6 +23,9 @@
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;

Review Comment:
   OK, we can remove the dependcies with the follow-up ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1162243590


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId)

Review Comment:
   Why do we need `isEndOfParition` and `segmentId` for each emit?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId)
+            throws IOException;
+
+    /** Closes the {@link TierStorage}, the opened channels should be closed. */
+    void close();
+
+    /** Releases the {@link TierStorage}, all resources should be released. */
+    void release();

Review Comment:
   What are the differences between these two methods?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier writer for the tiered store. The storage tier writers is independent of
+ * each other. Through the {@link TierWriter}, we can create {@link TierStorage} to store shuffle
+ * data.
+ */
+public interface TierWriter {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierStorage} of the {@link TierWriter} to write shuffle data. */
+    TierStorage createPartitionTierStorage();

Review Comment:
   This doesn't make sense. Why are we creating a storage from a writer? Shouldn't it be the other way around?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TierType.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The storage types for tiered store. */
+public enum TierType {

Review Comment:
   The enum type indicates that only these 3 types are supported. The questions is which components should be aware of that? Shouldn't this be limited to a factory or something?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId)

Review Comment:
   And why is a `Storage` only have writing interfaces but not reading?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId)

Review Comment:
   `partition`, `subpartition` and `emit` are still shuffle terminologies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161500942


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/MemorySegmentAndChannel.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** {@link MemorySegment} info and the corresponding channel index. */
+public class MemorySegmentAndChannel {

Review Comment:
   This is a class used in `BufferAccumulator`. The finished memory segment(without buffer recycler) should be emitted to each tier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on PR #22330:
URL: https://github.com/apache/flink/pull/22330#issuecomment-1501547675

   @xintongsong  Thanks for reviewing the PR. I have updated the code according to the comments. 
   
   Could you please review it again when having free time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22330:
URL: https://github.com/apache/flink/pull/22330#issuecomment-1494091815

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6099967a9b9ead99c7c6dee78ec6ec0a6be211d4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6099967a9b9ead99c7c6dee78ec6ec0a6be211d4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6099967a9b9ead99c7c6dee78ec6ec0a6be211d4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498829


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on PR #22330:
URL: https://github.com/apache/flink/pull/22330#issuecomment-1501526262

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122535


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {

Review Comment:
   Added a doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180097369


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+
+/** Utils to convert the Ids to Tiered Storage Ids, or vice versa. */
+public class TieredStorageIdMappingUtils {
+
+    public static TieredStorageJobId convertId(JobID jobID) {
+        return new TieredStorageJobId(jobID.getBytes());
+    }
+
+    public static JobID convertId(TieredStorageJobId tieredStorageJobId) {
+        return new JobID(tieredStorageJobId.getId());
+    }
+
+    public static TieredStorageTopicId convertId(IntermediateDataSetID intermediateDataSetID) {
+        return new TieredStorageTopicId(intermediateDataSetID.getBytes());
+    }
+
+    public static IntermediateDataSetID convertId(TieredStorageTopicId topicId) {
+        return new IntermediateDataSetID(new AbstractID(topicId.getId()));
+    }
+
+    public static TieredStoragePartitionId convertId(ResultPartitionID resultPartitionId) {
+        ByteBuf byteBuf = Unpooled.buffer();

Review Comment:
   Resolved it. 
   Add a `getBytes` method in the `ResultPartitionId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092087


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable {
+
+    private static final long serialVersionUID = -948472905048472823L;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122783


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {
+            for (int i = 0; i < numSubpartitions; ++i) {
+                bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType);
+            }
+        } else {
+            bufferAccumulator.receive(record, subpartitionId, dataType);
+        }
+    }
+
+    public void close() {
+        bufferAccumulator.close();
+        tierProducerAgents.forEach(TierProducerAgent::close);
+    }
+
+    public void writeFinishedBuffers(

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122783


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {
+            for (int i = 0; i < numSubpartitions; ++i) {
+                bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType);
+            }
+        } else {
+            bufferAccumulator.receive(record, subpartitionId, dataType);
+        }
+    }
+
+    public void close() {
+        bufferAccumulator.close();
+        tierProducerAgents.forEach(TierProducerAgent::close);
+    }
+
+    public void writeFinishedBuffers(

Review Comment:
   Renemed it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {
+            for (int i = 0; i < numSubpartitions; ++i) {
+                bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType);
+            }
+        } else {
+            bufferAccumulator.receive(record, subpartitionId, dataType);
+        }
+    }
+
+    public void close() {
+        bufferAccumulator.close();
+        tierProducerAgents.forEach(TierProducerAgent::close);
+    }
+
+    public void writeFinishedBuffers(

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187053105


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {

Review Comment:
   Fixed. I have moved the internal types and internal methods to the tail of the class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058942


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TieredStorageTestUtils.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The test utils for the tiered storage tests. */
+public class TieredStorageTestUtils {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164069146


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TierType.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The storage types for tiered store. */
+public enum TierType {

Review Comment:
   I have made the TierType as a private enum in the `TieredStoreShuffleEnvironment`. 
   The enum does not indicate that only these 3 types are supported, only for easily adding new tiers if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on PR #22330:
URL: https://github.com/apache/flink/pull/22330#issuecomment-1505204966

   Thanks for the comments. @xintongsong . I've updated the code. Could you please take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092466


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+    private static final String TIER_STORE_DIR = "tiered-store";
+
+    private static final char[] HEX_CHARS = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
+    };
+
+    public static byte[] randomBytes(int length) {
+        checkArgument(length > 0, "Must be positive.");
+
+        Random random = new Random();
+        byte[] bytes = new byte[length];
+        random.nextBytes(bytes);
+        return bytes;
+    }
+
+    public static String bytesToHexString(byte[] bytes) {

Review Comment:
   Removed, use `StringUtils#byteToHexString` instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502026


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreResultPartition.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link TieredStoreResultPartition} appends records and events to the tiered store, which supports
+ * the upstream dynamically switches storage tier for writing shuffle data, and the downstream will
+ * read data from the relevant storage tier.
+ */
+public class TieredStoreResultPartition extends ResultPartition implements ChannelStateHolder {

Review Comment:
   This is a typo, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161500134


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);

Review Comment:
   Fixed. This can be in the `BufferAccumulator`. It need not be in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164072696


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier writer for the tiered store. The storage tier writers is independent of
+ * each other. Through the {@link TierWriter}, we can create {@link TierStorage} to store shuffle
+ * data.
+ */
+public interface TierWriter {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierStorage} of the {@link TierWriter} to write shuffle data. */
+    TierStorage createPartitionTierStorage();

Review Comment:
   Fixed the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on PR #22330:
URL: https://github.com/apache/flink/pull/22330#issuecomment-1501539681

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1159213039


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {

Review Comment:
   Why do we need this interface? Or asked differently, why do we need `TierType` and `SupportedTierCombinations` implementing the same interface?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;

Review Comment:
   1. I'd suggest `o.a.f.r.i.n.partition.hybrid.tiered`.
   2. Why does this belong to `upstream`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    /**
+     * Currently, only these tier combinations are supported. If the configured tiers is contained
+     * in the following combinations, an exception will be thrown.
+     */
+    enum SupportedTierCombinations implements TieredStoreMode {
+        MEMORY,
+        MEMORY_DISK,
+        MEMORY_DISK_REMOTE,
+    }

Review Comment:
   Why are we limiting the supported combinations at all?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.

Review Comment:
   That means `IN_CACHE` is different from the other three. Then why are they in the same enum?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,

Review Comment:
   JavaDoc is required for each enum value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;

Review Comment:
   JavaDoc is required for all methods in an interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    /**
+     * Currently, only these tier combinations are supported. If the configured tiers is contained
+     * in the following combinations, an exception will be thrown.
+     */
+    enum SupportedTierCombinations implements TieredStoreMode {
+        MEMORY,
+        MEMORY_DISK,
+        MEMORY_DISK_REMOTE,

Review Comment:
   JavaDoc is required for each enum value.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;

Review Comment:
   This is not a common interface for all tiers, thus does not belong to this interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);

Review Comment:
   Use tiered store language. `subpartitionId` can be replaced by `producerId`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);

Review Comment:
   Why is this method in this interface? Shouldn't it belong to something like a consumer client?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);
+
+    void close();
+
+    void release();

Review Comment:
   JavaDoc is required for all methods in an interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/MemorySegmentAndChannel.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** {@link MemorySegment} info and the corresponding channel index. */
+public class MemorySegmentAndChannel {

Review Comment:
   This seems belong to the wrong commit? All its related changes are in the other commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);

Review Comment:
   Why do we need this for the tiers?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/cache/BufferAccumulatorImpl.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.MemorySegmentAndChannel;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * The implementation of the {@link BufferAccumulator}. The {@link BufferAccumulator} receives the
+ * records from {@link TieredStoreProducer} and the records will accumulate and transform to
+ * finished {@link * MemorySegment}s. The finished memory segments will be transferred to the
+ * corresponding tier dynamically.
+ */
+public class BufferAccumulatorImpl implements BufferAccumulator {

Review Comment:
   Why are we introducing the implementation now if it doesn't provide any actual implementation?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreConfiguration.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The configuration for tiered store. */
+public class TieredStoreConfiguration {
+
+    private final String tieredStoreTiers;

Review Comment:
   This should be an explicit type.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreResultPartition.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link TieredStoreResultPartition} appends records and events to the tiered store, which supports
+ * the upstream dynamically switches storage tier for writing shuffle data, and the downstream will
+ * read data from the relevant storage tier.
+ */
+public class TieredStoreResultPartition extends ResultPartition implements ChannelStateHolder {

Review Comment:
   Why implementing `ChannelStateHolder`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a common entrypoint of emitting records to tiered store. These records will be emitted to
+ * the {@link BufferAccumulator} to accumulate and transform into finished buffers.
+ */
+public class TieredStoreProducerImpl implements TieredStoreProducer {
+
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    public TieredStoreProducerImpl(
+            StorageTier[] storageTiers,
+            int numSubpartitions,
+            int bufferSize,
+            boolean isBroadcastOnly,
+            @Nullable BufferCompressor bufferCompressor) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+
+        this.bufferAccumulator = new BufferAccumulatorImpl();

Review Comment:
   Dependency should be injected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on PR #22330:
URL: https://github.com/apache/flink/pull/22330#issuecomment-1513080766

   Thanks for the comments. @xintongsong . I've updated the code. Could you please take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498735


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058705


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+    private final TieredStorageMasterClient tieredStorageMasterClient;
+
+    private final Map<JobID, List<ResultPartitionID>> jobPartitionIds;
+
+    public TieredInternalShuffleMaster(Configuration conf) {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                createTieredStorageConfiguration(conf);
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        List<TierMasterAgent> tierFactories =
+                tieredStorageConfiguration.getTierFactories().stream()
+                        .map(TierFactory::createMasterAgent)
+                        .peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry))
+                        .collect(Collectors.toList());
+        this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories);
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) {
+        jobPartitionIds.computeIfAbsent(jobID, ignore -> new ArrayList<>()).add(resultPartitionID);
+        tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+    }
+
+    public void releasePartition(ResultPartitionID resultPartitionID) {
+        tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+    }
+
+    public void unregisterJob(JobID jobID) {
+        List<ResultPartitionID> resultPartitionIDs = jobPartitionIds.remove(jobID);
+        if (resultPartitionIDs != null) {
+            resultPartitionIDs.forEach(
+                    resultPartitionID ->
+                            tieredStorageMasterClient.releasePartition(
+                                    convertId(resultPartitionID)));
+        }
+    }
+
+    private TieredStorageConfiguration createTieredStorageConfiguration(Configuration conf) {
+        // TODO, from the configuration, get the configured options(i.e., remote storage path, the
+        // reserved storage size, etc.), then set them to the builder.

Review Comment:
   Replaced this with a `TieredStorageConfiguration#fromConfiguration`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180124978


##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -49,6 +57,10 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final ResourceRegistry resourceRegistry;

Review Comment:
   Added a `TieredInternalShuffleMaster` wrapper class.
   All the tiered storage operations with the shuffle master should be wrapped in this class.
   
   The `ResourceRegistry ` is changed as a variable in the constructor of `TieredInternalShuffleMaster`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1181363214


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredResource.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+/** The resource (e.g., local files, remote storage files, etc.) for the Tiered Storage. */
+public interface TieredResource {

Review Comment:
   I'd suggest the name `TieredStorageResource`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+    private final TieredStorageMasterClient tieredStorageMasterClient;
+
+    private final Map<JobID, List<ResultPartitionID>> jobPartitionIds;
+
+    public TieredInternalShuffleMaster(Configuration conf) {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                createTieredStorageConfiguration(conf);
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        List<TierMasterAgent> tierFactories =
+                tieredStorageConfiguration.getTierFactories().stream()
+                        .map(TierFactory::createMasterAgent)
+                        .peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry))
+                        .collect(Collectors.toList());
+        this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories);
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) {
+        jobPartitionIds.computeIfAbsent(jobID, ignore -> new ArrayList<>()).add(resultPartitionID);
+        tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+    }
+
+    public void releasePartition(ResultPartitionID resultPartitionID) {
+        tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+    }
+
+    public void unregisterJob(JobID jobID) {
+        List<ResultPartitionID> resultPartitionIDs = jobPartitionIds.remove(jobID);
+        if (resultPartitionIDs != null) {
+            resultPartitionIDs.forEach(
+                    resultPartitionID ->
+                            tieredStorageMasterClient.releasePartition(
+                                    convertId(resultPartitionID)));
+        }
+    }

Review Comment:
   Each partition can be released for multiple times, one from `releasePartition` and one from `unregisterJob`, let alone the possibility that `releasePartition` and `unregisterJob` may or may not be called multiple times for the same partition/job id. This means all the classes handling the release of partitions (`TieredStorageMasterClient`, `TierMasterAgent`, etc.) would need to be aware of such possibility.
   
   Alternatively, we can guarantee each partition is only released for once, by eagerly removing the partition from `releasePartition ` in `releasePartition` and calling `TieredStorageMasterClient#releasePartition` only if the partition can be found in `releasePartition`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+    private final TieredStorageMasterClient tieredStorageMasterClient;
+
+    private final Map<JobID, List<ResultPartitionID>> jobPartitionIds;
+
+    public TieredInternalShuffleMaster(Configuration conf) {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                createTieredStorageConfiguration(conf);
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        List<TierMasterAgent> tierFactories =
+                tieredStorageConfiguration.getTierFactories().stream()
+                        .map(TierFactory::createMasterAgent)
+                        .peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry))

Review Comment:
   We are calling `setup` immediately after creating the master agents. Do we plan to add more steps in between in the subsequent PRs? If not, it might be better to pass the setting-up arguments directly to the creating method, so that we won't need to frequently check whether the agent is setup inside the agents.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {

Review Comment:
   Minor: In Flink, we usually organize the definition of a class in the following order: fields, methods, internal types.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TieredStorageTestUtils.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The test utils for the tiered storage tests. */
+public class TieredStorageTestUtils {

Review Comment:
   This class does not provide any util functions. And why would we need these constants in a dedicated class?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java:
##########
@@ -23,6 +23,9 @@
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;

Review Comment:
   I also don't think `ResultPartitionID` should depends on Netty classes. However, it seems `IntermediateResultPartitionID`, `ExecutionAttemptID` and likely more have already depended on Netty classes. Let's keep it as is and create a follow-up ticket for cleaning the improper dependency.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+    private final TieredStorageMasterClient tieredStorageMasterClient;
+
+    private final Map<JobID, List<ResultPartitionID>> jobPartitionIds;
+
+    public TieredInternalShuffleMaster(Configuration conf) {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                createTieredStorageConfiguration(conf);
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        List<TierMasterAgent> tierFactories =
+                tieredStorageConfiguration.getTierFactories().stream()
+                        .map(TierFactory::createMasterAgent)
+                        .peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry))
+                        .collect(Collectors.toList());
+        this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories);
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) {
+        jobPartitionIds.computeIfAbsent(jobID, ignore -> new ArrayList<>()).add(resultPartitionID);
+        tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+    }
+
+    public void releasePartition(ResultPartitionID resultPartitionID) {
+        tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+    }
+
+    public void unregisterJob(JobID jobID) {
+        List<ResultPartitionID> resultPartitionIDs = jobPartitionIds.remove(jobID);
+        if (resultPartitionIDs != null) {
+            resultPartitionIDs.forEach(
+                    resultPartitionID ->
+                            tieredStorageMasterClient.releasePartition(
+                                    convertId(resultPartitionID)));
+        }
+    }
+
+    private TieredStorageConfiguration createTieredStorageConfiguration(Configuration conf) {
+        // TODO, from the configuration, get the configured options(i.e., remote storage path, the
+        // reserved storage size, etc.), then set them to the builder.

Review Comment:
   From this description, I think this method should not belong to `TieredInternalShuffleMaster`. Take remote storage path as an example, we need it not only for the master but also for the producer & consumer, and we definitely want to derive the same path from the configuration for all the components. Same for reserved storage size and likely other configurations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164072367


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId)

Review Comment:
   It has been renamed to `write` in `TierStorageWriter`. 
   Only two arguments `consumerId` and `finishedBuffer` are kept. 
   `isEndOfParition` and `segmentId ` are removed in `write` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong closed pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
URL: https://github.com/apache/flink/pull/22330


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058942


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TieredStorageTestUtils.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The test utils for the tiered storage tests. */
+public class TieredStorageTestUtils {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502620


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a common entrypoint of emitting records to tiered store. These records will be emitted to
+ * the {@link BufferAccumulator} to accumulate and transform into finished buffers.
+ */
+public class TieredStoreProducerImpl implements TieredStoreProducer {
+
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    public TieredStoreProducerImpl(
+            StorageTier[] storageTiers,
+            int numSubpartitions,
+            int bufferSize,
+            boolean isBroadcastOnly,
+            @Nullable BufferCompressor bufferCompressor) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+
+        this.bufferAccumulator = new BufferAccumulatorImpl();

Review Comment:
   I have updated the producer constructor and made this a constructor argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498092


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {

Review Comment:
   `TieredStoreMode ` is removed, we use `TierType` in `o.a.f.r.io.network.partition.tieredstore`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on PR #22330:
URL: https://github.com/apache/flink/pull/22330#issuecomment-1514523678

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092886


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable {
+
+    private static final long serialVersionUID = -948472905048472823L;
+
+    /** ID represented by a byte array. */
+    protected final byte[] id;
+
+    /** Pre-calculated hash-code for acceleration. */
+    protected final int hashCode;
+
+    public TieredStorageAbstractId(byte[] id) {
+        checkArgument(id != null, "Must be not null.");
+
+        this.id = id;
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public TieredStorageAbstractId(int length) {
+        checkArgument(length > 0, "Must be positive.");
+
+        this.id = randomBytes(length);
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public byte[] getId() {

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180118466


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> jobPartitionIds;

Review Comment:
   OK, good point. Added a `TieredInternalShuffleMaster` wrapper class.  
   All the tiered storage operations with the shuffle master should be wrapped in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058204


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+    private final TieredStorageMasterClient tieredStorageMasterClient;
+
+    private final Map<JobID, List<ResultPartitionID>> jobPartitionIds;
+
+    public TieredInternalShuffleMaster(Configuration conf) {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                createTieredStorageConfiguration(conf);
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        List<TierMasterAgent> tierFactories =
+                tieredStorageConfiguration.getTierFactories().stream()
+                        .map(TierFactory::createMasterAgent)
+                        .peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry))
+                        .collect(Collectors.toList());
+        this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories);
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) {
+        jobPartitionIds.computeIfAbsent(jobID, ignore -> new ArrayList<>()).add(resultPartitionID);
+        tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+    }
+
+    public void releasePartition(ResultPartitionID resultPartitionID) {
+        tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+    }
+
+    public void unregisterJob(JobID jobID) {
+        List<ResultPartitionID> resultPartitionIDs = jobPartitionIds.remove(jobID);
+        if (resultPartitionIDs != null) {
+            resultPartitionIDs.forEach(
+                    resultPartitionID ->
+                            tieredStorageMasterClient.releasePartition(
+                                    convertId(resultPartitionID)));
+        }
+    }

Review Comment:
   OK. Because we can not know the exact jobId when releasing a result partition, I added a `partitionJobIds` to record the jobId of the `resultPartitioinId` to be released.
   
   1. When adding a partition, add the partition id and job id to `jobPartitionIds` and `partitionJobIds` separately.
   2. When releasing a partition, remove the partition id in the `jobPartitionIds` and `partitionJobIds` separately.
   3. When registering a job, remove all the partition ids of the job in the `jobPartitionIds` and `partitionJobIds`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187054589


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+    private final TieredStorageMasterClient tieredStorageMasterClient;
+
+    private final Map<JobID, List<ResultPartitionID>> jobPartitionIds;
+
+    public TieredInternalShuffleMaster(Configuration conf) {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                createTieredStorageConfiguration(conf);
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        List<TierMasterAgent> tierFactories =
+                tieredStorageConfiguration.getTierFactories().stream()
+                        .map(TierFactory::createMasterAgent)
+                        .peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry))

Review Comment:
   Fixed it. 
   Added an argument for the `TierFactory#createMasterAgent` to avoid the `setup` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164072696


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier writer for the tiered store. The storage tier writers is independent of
+ * each other. Through the {@link TierWriter}, we can create {@link TierStorage} to store shuffle
+ * data.
+ */
+public interface TierWriter {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierStorage} of the {@link TierWriter} to write shuffle data. */
+    TierStorage createPartitionTierStorage();

Review Comment:
   Fixed the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498735


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,

Review Comment:
   Fixed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164073848


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions.
+ */
+public interface TierStorage {
+
+    /** Setups the {@link TierStorage}. */
+    void setup() throws IOException;
+
+    /** Emits the finished {@link Buffer} to a specific subpartition. */
+    boolean emit(
+            int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId)
+            throws IOException;
+
+    /** Closes the {@link TierStorage}, the opened channels should be closed. */
+    void close();
+
+    /** Releases the {@link TierStorage}, all resources should be released. */
+    void release();

Review Comment:
   I have updated it. `TierStorageWriter` only has `close` method. `TierStorage` only has `release` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161504837


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);

Review Comment:
   The method has been removed from the API. It will be in the netty-based consumer API in the subsequential change. 
   
   In that change, we will use the `producerId`  instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161499623


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;

Review Comment:
   Ok, it is removed from the API. The method will be in netty-based upstream consumer API in the subsequential PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092886


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable {
+
+    private static final long serialVersionUID = -948472905048472823L;
+
+    /** ID represented by a byte array. */
+    protected final byte[] id;
+
+    /** Pre-calculated hash-code for acceleration. */
+    protected final int hashCode;
+
+    public TieredStorageAbstractId(byte[] id) {
+        checkArgument(id != null, "Must be not null.");
+
+        this.id = id;
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public TieredStorageAbstractId(int length) {
+        checkArgument(length > 0, "Must be positive.");
+
+        this.id = randomBytes(length);
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public byte[] getId() {

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1170781624


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+    private static final String TIER_STORE_DIR = "tiered-store";
+
+    private static final char[] HEX_CHARS = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
+    };
+
+    public static byte[] randomBytes(int length) {
+        checkArgument(length > 0, "Must be positive.");
+
+        Random random = new Random();
+        byte[] bytes = new byte[length];
+        random.nextBytes(bytes);
+        return bytes;
+    }
+
+    public static String bytesToHexString(byte[] bytes) {

Review Comment:
   Why not just use `StringUtils#byteToHexString`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable {

Review Comment:
   1. I'd suggest the name `TieredStorageBytesBasedDataIdentifier`
   2. I'd suggest to make this an abstract class



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+    private static final char[] HEX_CHARS = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
+    };
+
+    public static byte[] randomBytes(int length) {

Review Comment:
   Why do we need this? It seems never used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageJobId.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+
+/**
+ * Identifier of a job.
+ *
+ * <p>A job is equivalent to a job in Flink.
+ */
+public class TieredStorageJobId extends TieredStorageAbstractId {

Review Comment:
   I wonder why do we need this. What does a job id mean for the tiered storage? Should we simply map Flink's JobId & PartitionId to topic id in tiered storage?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {

Review Comment:
   It took me a while to understand that this means the record should be broadcasted while the result partition is not broadcast only. This would be easier to understand if we have a JavaDoc for the public method that describes each argument of the method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -49,6 +57,10 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final ResourceRegistry resourceRegistry;

Review Comment:
   This is never used outside the constructor of `NettyShuffleMaster`.  Then why keeping the reference.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, TieredStoragePartitionId partitionId) {

Review Comment:
   And why would shuffle register resource at the storage? Do you mean add partition?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }
+
+    public static TierType[] memoryDiskTierTypes() {

Review Comment:
   And the method name confuses me:
   1. Why do we need a method that returns only memory and disk tiers?
   2. This implicitly assumes `DEFAULT_MEMORY_DISK_TIER_TYPES` is memory + disk, which is dangerous. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable {
+
+    private static final long serialVersionUID = -948472905048472823L;
+
+    /** ID represented by a byte array. */
+    protected final byte[] id;
+
+    /** Pre-calculated hash-code for acceleration. */
+    protected final int hashCode;
+
+    public TieredStorageAbstractId(byte[] id) {
+        checkArgument(id != null, "Must be not null.");
+
+        this.id = id;
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public TieredStorageAbstractId(int length) {
+        checkArgument(length > 0, "Must be positive.");
+
+        this.id = randomBytes(length);
+        this.hashCode = Arrays.hashCode(id);
+    }
+
+    public byte[] getId() {

Review Comment:
   I'd suggest the name `getBytes()`. We may also modify the name of the field.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -29,6 +34,18 @@ public class TieredStorageUtils {
         '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
     };
 
+    public static List<TierFactory> getTierFactoriesFromConfiguration() {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                TieredStorageConfiguration.builder()
+                        .setTierTypes(TieredStorageConfiguration.memoryDiskTierTypes())
+                        .build();

Review Comment:
   This is weird.
   1. We get an array of types out of `TieredStorageConfiguration`, then set it back to `TieredStorageConfiguration#Builder`.
   2. The method `xxxFromConfiguration` does not accept a configuration as input. Instead, it creates a configuration internally.
   
   I assume 2 is a temporal status, which should be commented. In addition, how is `TieredStorageConfiguration` supposed to be generated in the final state?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+
+/** Utils to convert the Ids to Tiered Storage Ids, or vice versa. */
+public class TieredStorageIdMappingUtils {
+
+    public static TieredStorageJobId convertId(JobID jobID) {
+        return new TieredStorageJobId(jobID.getBytes());
+    }
+
+    public static JobID convertId(TieredStorageJobId tieredStorageJobId) {
+        return new JobID(tieredStorageJobId.getId());
+    }
+
+    public static TieredStorageTopicId convertId(IntermediateDataSetID intermediateDataSetID) {
+        return new TieredStorageTopicId(intermediateDataSetID.getBytes());
+    }
+
+    public static IntermediateDataSetID convertId(TieredStorageTopicId topicId) {
+        return new IntermediateDataSetID(new AbstractID(topicId.getId()));
+    }
+
+    public static TieredStoragePartitionId convertId(ResultPartitionID resultPartitionId) {
+        ByteBuf byteBuf = Unpooled.buffer();

Review Comment:
   Better not to use netty classes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> jobPartitionIds;

Review Comment:
   The storage client probably should not understand the mapping between job and partition. It should be mapped in the shuffle components, e.g., shuffle master.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }
+
+    public static TierType[] memoryDiskTierTypes() {

Review Comment:
   It doesn't make sense that a public method returns a private type.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable {
+
+    private static final long serialVersionUID = -948472905048472823L;

Review Comment:
   All `serialVersionUID` should start at `1L`.
   https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-serialization



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, TieredStoragePartitionId partitionId) {

Review Comment:
   Shouldn't the resource also belongs to a certain topic?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }

Review Comment:
   Need JavaDoc for enum values.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }

Review Comment:
   Returning an array here doesn't make sense to me. How about replace this with `getNumTiers()`, so the valid index would be `[0, getNumbTiers()]`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, TieredStoragePartitionId partitionId) {
+        jobPartitionIds.computeIfAbsent(jobId, ignore -> new ArrayList<>()).add(partitionId);
+        resourceRegistry.registerResource(
+                partitionId, () -> tiers.forEach(TierMasterAgent::release));

Review Comment:
   This is probably correct, but against intuition. What is the resource being registered? This is more suitable for a releaser / cleaner pattern.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -49,6 +57,10 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final ResourceRegistry resourceRegistry;

Review Comment:
   And if we need to keep them, it might be better to have a wrapper class to hold all the tiered shuffle related components.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    private final BufferCompressor bufferCompressor;
+
+    private final List<TierProducerAgent> tierProducerAgents;
+
+    public TieredStorageProducerClient(
+            int numSubpartitions,
+            boolean isBroadcastOnly,
+            BufferAccumulator bufferAccumulator,
+            @Nullable BufferCompressor bufferCompressor,
+            List<TierProducerAgent> tierProducerAgents) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+        this.bufferAccumulator = bufferAccumulator;
+        this.bufferCompressor = bufferCompressor;
+        this.tierProducerAgents = tierProducerAgents;
+
+        bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+    }
+
+    public void write(
+            ByteBuffer record,
+            TieredStorageSubpartitionId subpartitionId,
+            Buffer.DataType dataType,
+            boolean isBroadcast)
+            throws IOException {
+
+        if (isBroadcast && !isBroadcastOnly) {
+            for (int i = 0; i < numSubpartitions; ++i) {
+                bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType);
+            }
+        } else {
+            bufferAccumulator.receive(record, subpartitionId, dataType);
+        }
+    }
+
+    public void close() {
+        bufferAccumulator.close();
+        tierProducerAgents.forEach(TierProducerAgent::close);
+    }
+
+    public void writeFinishedBuffers(

Review Comment:
   This should be private. And I'd suggest the name `writeAccumulatedBuffers`. Otherwise, we need to explain what is the concept `finished` for a buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180096713


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+            new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+    private final TierType[] tierTypes;
+
+    private TieredStorageConfiguration(TierType[] tierTypes) {
+        this.tierTypes = tierTypes;
+    }
+
+    public int[] getTierIndexes() {
+        int[] tierIndexes = new int[tierTypes.length];
+        for (int i = 0; i < tierTypes.length; i++) {
+            tierIndexes[i] = i;
+        }
+        return tierIndexes;
+    }

Review Comment:
   Removed it. 
   Because we can use a `TieredStorageConfiguration#getTierFactories` to get the factories. So the method is useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502932


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);
+
+    void close();
+
+    void release();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187053970


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredResource.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+/** The resource (e.g., local files, remote storage files, etc.) for the Tiered Storage. */
+public interface TieredResource {

Review Comment:
   Renamed it.
   In addition, I also renamed the `ResourceRegistry` to `TieredStorageResourceRegistry`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502620


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a common entrypoint of emitting records to tiered store. These records will be emitted to
+ * the {@link BufferAccumulator} to accumulate and transform into finished buffers.
+ */
+public class TieredStoreProducerImpl implements TieredStoreProducer {
+
+    private final boolean isBroadcastOnly;
+
+    private final int numSubpartitions;
+
+    private final BufferAccumulator bufferAccumulator;
+
+    public TieredStoreProducerImpl(
+            StorageTier[] storageTiers,
+            int numSubpartitions,
+            int bufferSize,
+            boolean isBroadcastOnly,
+            @Nullable BufferCompressor bufferCompressor) {
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.numSubpartitions = numSubpartitions;
+
+        this.bufferAccumulator = new BufferAccumulatorImpl();

Review Comment:
   I have updated the producer constructor and made this a constructor argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502932


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);
+
+    void close();
+
+    void release();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502777


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    /**
+     * Currently, only these tier combinations are supported. If the configured tiers is contained
+     * in the following combinations, an exception will be thrown.
+     */
+    enum SupportedTierCombinations implements TieredStoreMode {
+        MEMORY,
+        MEMORY_DISK,
+        MEMORY_DISK_REMOTE,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161500038


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);

Review Comment:
   The method is removed from the API. The method will be in netty-based upstream consumer API in the subsequential PR.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+    void setup() throws IOException;
+
+    /** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */
+    TierWriter createPartitionTierWriter();
+
+    /** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */
+    TierReaderView createTierReaderView(
+            int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException;
+
+    /**
+     * Before start a new segment, the writer side calls the method to check whether the {@link
+     * StorageTier} can store the next segment.
+     */
+    boolean canStoreNextSegment(int subpartitionId);
+
+    /**
+     * Before reading the segment data, the consumer calls the method to check whether the {@link
+     * StorageTier} has the segment.
+     */
+    boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+    /** Sets the metric group. */
+    void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498297


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.

Review Comment:
   `IN_CACHE` is removed from `TierType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180094931


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageJobId.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+
+/**
+ * Identifier of a job.
+ *
+ * <p>A job is equivalent to a job in Flink.
+ */
+public class TieredStorageJobId extends TieredStorageAbstractId {

Review Comment:
   Removed it. Now I use a partition id directly to release the resource because the partition id is unique.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+    private enum TierType {
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180110166


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -29,6 +34,18 @@ public class TieredStorageUtils {
         '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
     };
 
+    public static List<TierFactory> getTierFactoriesFromConfiguration() {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                TieredStorageConfiguration.builder()
+                        .setTierTypes(TieredStorageConfiguration.memoryDiskTierTypes())
+                        .build();

Review Comment:
   Removed the `getTieredFactories` in `TieredStorageUtis`. In addition, the `TieredStorageUtis` is useless now, so I also removed it. Now we can get the factories with `TieredStorageConfiguration#getTierFactories`.
   
   Removed the `setTierTypes`, because in the first version, we need not set the tier types from outside of the configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180116664


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+    private final List<TierMasterAgent> tiers;
+
+    private final Map<TieredStorageJobId, List<TieredStoragePartitionId>> jobPartitionIds;
+
+    private final ResourceRegistry resourceRegistry;
+
+    public TieredStorageMasterClient(
+            List<TierMasterAgent> tiers, ResourceRegistry resourceRegistry) {
+        this.tiers = tiers;
+        this.resourceRegistry = resourceRegistry;
+        this.jobPartitionIds = new HashMap<>();
+    }
+
+    public void registerResource(TieredStorageJobId jobId, TieredStoragePartitionId partitionId) {

Review Comment:
   This need not be a topic id, because the result partition id is unique. 
   
   The reason why the result partition id is unique is that there is `IntermediateResultPartitionID` in its own field and this `IntermediateResultPartitionID` has included the `IntermediateDataSetID` information naturally.
   
   In addition, these methods have been renamed into `addPartition` and `releasePartition`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

Posted by "TanYuxin-tyx (via GitHub)" <gi...@apache.org>.
TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502777


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+    /**
+     * The shuffle records will be accumulated to the finished buffer before writing to the
+     * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}.
+     */
+    enum TierType implements TieredStoreMode {
+        IN_CACHE,
+        IN_MEM,
+        IN_DISK,
+        IN_REMOTE,
+    }
+
+    /**
+     * Currently, only these tier combinations are supported. If the configured tiers is contained
+     * in the following combinations, an exception will be thrown.
+     */
+    enum SupportedTierCombinations implements TieredStoreMode {
+        MEMORY,
+        MEMORY_DISK,
+        MEMORY_DISK_REMOTE,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org