You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/20 02:38:47 UTC

[GitHub] [flink] reswqa opened a new pull request, #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

reswqa opened a new pull request, #21122:
URL: https://github.com/apache/flink/pull/21122

   ## What is the purpose of the change
   
   *Hybrid shuffle does not support multiple consumer for single subpartition data. This will bring some defects, such as the inability to support partition reuse, speculative execution. In particular, it cannot support broadcast optimization, that is, hybrid shuffle writes multiple copies of broadcast data, This will cause a waste of memory and disk space and affect the performance of shuffle write phase. Ideally, for the full spilling strategy, any broadcast data (record or event) should only write one piece of data in the memory, and the same is true for the disk.*
   
   ## Brief change log
   
     - *full spilling strategy is no long consider consumer offset*
     - *supports multiple consumer.*
     - *supports broadcast optimize*
   
   
   ## Verifying this change
   
   This change added unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1294359915

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1001497856


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
-    private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
-            new ConcurrentHashMap<>();
+    private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+            subpartitionViewOperationsMap;

Review Comment:
   Definition should be documented.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java:
##########
@@ -79,4 +79,24 @@ enum ConsumeStatus {
         /** The buffer is either consumed or not consumed. */
         ALL
     }
+
+    /** This class represents a pair of {@link ConsumeStatus} and consumer id. */
+    class ConsumeStatusWithId {
+        public static final ConsumeStatusWithId ALL_CONSUME_STATUS =
+                new ConsumeStatusWithId(ConsumeStatus.ALL, -1);
+
+        ConsumeStatus status;
+
+        int consumerId;

Review Comment:
   I think we need a dedicated class for the consumer id, where we can define the special values such as `ANY` and `SINGLE_CONSUMER`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
-    private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
-            new ConcurrentHashMap<>();
+    private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+            subpartitionViewOperationsMap;

Review Comment:
   Can we replace the outer `List` with an array here, to align with `subpartitionMemoryDataManagers`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
-    private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
-            new ConcurrentHashMap<>();
+    private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+            subpartitionViewOperationsMap;

Review Comment:
   Why the inner is a `Map` rather than a `List`. With a list, we won't need the `consumerIdCounter` in `HsResultPartition`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -161,15 +164,15 @@ public void append(ByteBuffer record, int targetChannel, Buffer.DataType dataTyp
      * subpartition.
      */
     public HsDataView registerSubpartitionView(
-            int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) {
+            int subpartitionId,
+            int consumerId,
+            HsSubpartitionViewInternalOperations viewOperations) {
         HsSubpartitionViewInternalOperations oldView =
-                subpartitionViewOperationsMap.put(subpartitionId, viewOperations);
-        if (oldView != null) {
-            LOG.debug(
-                    "subpartition : {} register subpartition view will replace old view. ",
-                    subpartitionId);
-        }
-        return getSubpartitionMemoryDataManager(subpartitionId);
+                subpartitionViewOperationsMap.get(subpartitionId).put(consumerId, viewOperations);
+        Preconditions.checkState(
+                oldView == null, "Each subpartition view should have unique consumerId.");
+        return getSubpartitionMemoryDataManager(subpartitionId)

Review Comment:
   Shall we also check that selective strategy should not have multiple consumers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1294531619

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1294395156

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1005572493


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** This class represents the identifier of hybrid shuffle's consumer. */
+public class HsConsumerId {
+    /**
+     * This consumer id is used in the scenarios that information related to specific consumer needs
+     * to be ignored.
+     */
+    public static final HsConsumerId ANY = new HsConsumerId(-1);
+
+    /**
+     * This consumer id is used in the scenarios that only one consumer is allowed for a single
+     * subpartition.
+     */
+    public static final HsConsumerId DEFAULT = new HsConsumerId(0);

Review Comment:
   JavaDoc needs update.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java:
##########
@@ -295,4 +297,15 @@ private HsSpillingStrategy getSpillingStrategy(
                 throw new IllegalConfigurationException("Illegal spilling strategy.");
         }
     }
+
+    private void checkMultipleConsumerIsAllowed(
+            HsConsumerId newConsumerId, HybridShuffleConfiguration hybridShuffleConfiguration) {
+        if (hybridShuffleConfiguration.getSpillingStrategyType()
+                == SpillingStrategyType.SELECTIVE) {
+            checkState(
+                    newConsumerId == HsConsumerId.DEFAULT,

Review Comment:
   I'd suggest to check that the last id is `null`, rather than the current id is `DEFAULT`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -216,6 +218,11 @@ public ResultPartition create(
             }
         } else if (type == ResultPartitionType.HYBRID_FULL
                 || type == ResultPartitionType.HYBRID_SELECTIVE) {
+            if (isBroadcast) {
+                // for broadcast result partition, it can be optimized to always use full spilling
+                // strategy to significantly reduce shuffle data writing cost.
+                type = ResultPartitionType.HYBRID_FULL;

Review Comment:
   It would be nice to print some log when overwriting things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1296144735

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1004433748


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import java.util.Objects;
+
+/** This class represents the identifier of hybrid shuffle's consumer. */
+public class HsConsumerId {
+    /**
+     * This consumer id is used in the scenarios that information related to specific consumer needs
+     * to be ignored.
+     */
+    public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1);
+
+    /**
+     * This consumer id is used in the scenarios that only one consumer is allowed for a single
+     * subpartition.
+     */
+    public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0);
+
+    /** This is a unique field for each consumer of a single subpartition. */
+    private final int id;
+
+    public HsConsumerId(int id) {
+        this.id = id;
+    }

Review Comment:
   good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1296137584

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1003186537


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import java.util.Objects;
+
+/** This class represents the identifier of hybrid shuffle's consumer. */
+public class HsConsumerId {
+    /**
+     * This consumer id is used in the scenarios that information related to specific consumer needs
+     * to be ignored.
+     */
+    public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1);
+
+    /**
+     * This consumer id is used in the scenarios that only one consumer is allowed for a single
+     * subpartition.
+     */
+    public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0);

Review Comment:
   I'd suggest to name this `DEFAULT`.
   - It is guaranteed that if there's only one consumer, this id will be used. However, there's no guarantee for the other way. It is possible that this id is used when there're multiple consumers.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java:
##########
@@ -79,4 +79,24 @@ enum ConsumeStatus {
         /** The buffer is either consumed or not consumed. */
         ALL
     }
+
+    /** This class represents a pair of {@link ConsumeStatus} and consumer id. */
+    class ConsumeStatusWithId {
+        public static final ConsumeStatusWithId ALL_CONSUME_STATUS =

Review Comment:
   I'd suggest the name `ALL_ALL` or `ANY_ANY`, following the pattern `ConsumeStatus_ConsumerID`. If more special values are needed in future, we can follow the same pattern.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import java.util.Objects;
+
+/** This class represents the identifier of hybrid shuffle's consumer. */
+public class HsConsumerId {
+    /**
+     * This consumer id is used in the scenarios that information related to specific consumer needs
+     * to be ignored.
+     */
+    public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1);
+
+    /**
+     * This consumer id is used in the scenarios that only one consumer is allowed for a single
+     * subpartition.
+     */
+    public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0);
+
+    /** This is a unique field for each consumer of a single subpartition. */
+    private final int id;
+
+    public HsConsumerId(int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }

Review Comment:
   Should not expose the internal `id`. This is only for creating a new id from an existing one.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataConsumer.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents a unique consumer which consume a specific subpartition's data in memory.
+ */
+public class HsSubpartitionMemoryDataConsumer implements HsDataView {
+
+    @GuardedBy("consumerLock")
+    private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<>();
+
+    private final Lock consumerLock;
+
+    private final Lock resultPartitionLock;
+
+    private final HsConsumerId consumerId;
+
+    private final int subpartitionId;
+
+    private final HsMemoryDataManagerOperation memoryDataManagerOperation;
+
+    public HsSubpartitionMemoryDataConsumer(
+            Lock resultPartitionLock,
+            Lock consumerLock,
+            int subpartitionId,
+            HsConsumerId consumerId,
+            HsMemoryDataManagerOperation memoryDataManagerOperation) {
+        this.resultPartitionLock = resultPartitionLock;
+        this.consumerLock = consumerLock;
+        this.subpartitionId = subpartitionId;
+        this.consumerId = consumerId;
+        this.memoryDataManagerOperation = memoryDataManagerOperation;
+    }
+
+    @GuardedBy("consumerLock")
+    // this method must be called with consumerLock.writeLock.

Review Comment:
   This comment doesn't seem correct. How does a `Lock` have `writeLock`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataConsumer.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents a unique consumer which consume a specific subpartition's data in memory.
+ */
+public class HsSubpartitionMemoryDataConsumer implements HsDataView {
+
+    @GuardedBy("consumerLock")
+    private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<>();
+
+    private final Lock consumerLock;
+
+    private final Lock resultPartitionLock;
+
+    private final HsConsumerId consumerId;
+
+    private final int subpartitionId;
+
+    private final HsMemoryDataManagerOperation memoryDataManagerOperation;
+
+    public HsSubpartitionMemoryDataConsumer(
+            Lock resultPartitionLock,
+            Lock consumerLock,
+            int subpartitionId,
+            HsConsumerId consumerId,
+            HsMemoryDataManagerOperation memoryDataManagerOperation) {
+        this.resultPartitionLock = resultPartitionLock;
+        this.consumerLock = consumerLock;
+        this.subpartitionId = subpartitionId;
+        this.consumerId = consumerId;
+        this.memoryDataManagerOperation = memoryDataManagerOperation;
+    }
+
+    @GuardedBy("consumerLock")
+    // this method must be called with consumerLock.writeLock.
+    public void addInitialBuffers(Deque<HsBufferContext> buffers) {
+        unConsumedBuffers.addAll(buffers);
+    }
+
+    // this method only called from subpartitionMemoryDataManager with write lock.
+    @SuppressWarnings("FieldAccessNotGuarded")
+    public boolean addBuffer(HsBufferContext bufferContext) {
+        unConsumedBuffers.add(bufferContext);
+        trimHeadingReleasedBuffers();
+        return unConsumedBuffers.size() <= 1;
+    }
+
+    /**
+     * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so,
+     * return the buffer and backlog.
+     *
+     * @param toConsumeIndex index of buffer to be consumed.
+     * @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer
+     *     and backlog. Otherwise, return {@link Optional#empty()}.
+     */
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
+    // subpartitionLock.
+    @Override
+    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int toConsumeIndex) {
+        Optional<Tuple2<HsBufferContext, Buffer.DataType>> bufferAndNextDataType =
+                callWithLock(
+                        () -> {
+                            if (!checkFirstUnConsumedBufferIndex(toConsumeIndex)) {
+                                return Optional.empty();
+                            }
+
+                            HsBufferContext bufferContext =
+                                    checkNotNull(unConsumedBuffers.pollFirst());
+                            bufferContext.consumed(consumerId);
+                            Buffer.DataType nextDataType =
+                                    peekNextToConsumeDataTypeInternal(toConsumeIndex + 1);
+                            return Optional.of(Tuple2.of(bufferContext, nextDataType));
+                        });
+
+        bufferAndNextDataType.ifPresent(
+                tuple ->
+                        memoryDataManagerOperation.onBufferConsumed(
+                                tuple.f0.getBufferIndexAndChannel()));
+        return bufferAndNextDataType.map(
+                tuple ->
+                        new ResultSubpartition.BufferAndBacklog(
+                                tuple.f0.getBuffer().readOnlySlice(),
+                                getBacklog(),
+                                tuple.f1,
+                                toConsumeIndex));
+    }
+
+    /**
+     * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed next time.
+     * If so, return the next buffer's data type.
+     *
+     * @param nextToConsumeIndex index of the buffer to be consumed next time.
+     * @return If the head of {@link #unConsumedBuffers} is target, return the buffer's data type.
+     *     Otherwise, return {@link Buffer.DataType#NONE}.
+     */
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and
+    // consumerLock.
+    @Override
+    public Buffer.DataType peekNextToConsumeDataType(int nextToConsumeIndex) {
+        return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
+    }
+
+    @GuardedBy("consumerLock")
+    private Buffer.DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) {
+        return checkFirstUnConsumedBufferIndex(nextToConsumeIndex)
+                ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType()
+                : Buffer.DataType.NONE;
+    }
+
+    @GuardedBy("consumerLock")
+    private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) {
+        trimHeadingReleasedBuffers();
+        return !unConsumedBuffers.isEmpty()
+                && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex()
+                        == expectedBufferIndex;
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the
+    // result greater than or equal to the actual backlog, but obtaining an accurate backlog will
+    // bring too much extra overhead.
+    @Override
+    public int getBacklog() {
+        return unConsumedBuffers.size();
+    }
+
+    @Override
+    public void releaseDataView() {
+        memoryDataManagerOperation.onConsumerRelease(subpartitionId, consumerId);
+    }
+
+    @GuardedBy("consumerLock")
+    private void trimHeadingReleasedBuffers() {
+        while (!unConsumedBuffers.isEmpty() && unConsumedBuffers.peekFirst().isReleased()) {
+            unConsumedBuffers.removeFirst();
+        }
+    }
+
+    private <E extends Exception> void runWithLock(ThrowingRunnable<E> runnable) throws E {

Review Comment:
   Unused



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -106,14 +111,19 @@ public HsMemoryDataManager(
         ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
         this.lock = readWriteLock.writeLock();
 
+        this.subpartitionViewOperationsMap = new ArrayList<>(numSubpartitions);
+        // currently, only HsFullSpillingStrategy supports multiple consumer.
+        final boolean allowMultipleConsumer = spillStrategy instanceof HsFullSpillingStrategy;

Review Comment:
   It should be `HsResultPartition`, rather than `HsMemoryDataManager` or `HsSubpartitionMemoryDataManager`, to decide whether multiple consumers should be allowed. This is a property of the entire result partition, not the memory component.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java:
##########
@@ -76,6 +78,8 @@ public class HsResultPartition extends ResultPartition {
 
     @Nullable private HsMemoryDataManager memoryDataManager;
 
+    private final boolean enableBroadcastOptimize;

Review Comment:
   The name is confusing. I'd suggest `isBroadcastOnly`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java:
##########
@@ -186,17 +193,18 @@ public ResultSubpartitionView createSubpartitionView(
             throw new PartitionNotFoundException(getPartitionId());
         }
 
+        // assign a unique id for each consumer, now it is guaranteed by the value that is one
+        // higher than the last consumerId's id field.
+        HsConsumerId consumerId = nextConsumerIds[subpartitionId];
+        nextConsumerIds[subpartitionId] = new HsConsumerId(consumerId.getId() + 1);

Review Comment:
   See my other comments for consumer id generation and multiple consumer checking.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -240,6 +242,8 @@ public ResultPartition create(
                                                             .SpillingStrategyType.SELECTIVE)
                                     .build(),
                             bufferCompressor,
+                            // Only hybrid full result partition support broadcast optimization.
+                            isBroadcast && type == ResultPartitionType.HYBRID_FULL,

Review Comment:
   Instead of disabling this optimization for selective spilling, I'd suggest to always use full spilling when the result partition is broadcast.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import java.util.Objects;
+
+/** This class represents the identifier of hybrid shuffle's consumer. */
+public class HsConsumerId {
+    /**
+     * This consumer id is used in the scenarios that information related to specific consumer needs
+     * to be ignored.
+     */
+    public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1);
+
+    /**
+     * This consumer id is used in the scenarios that only one consumer is allowed for a single
+     * subpartition.
+     */
+    public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0);
+
+    /** This is a unique field for each consumer of a single subpartition. */
+    private final int id;
+
+    public HsConsumerId(int id) {
+        this.id = id;
+    }

Review Comment:
   We can make this `private` to forbid creating with arbitrary integer (e.g., negative), and provide a factory like:
   ```
   public static newId(@Nullable HsConsumerId lastId) {
     return lastId == null ? SINGLE_CONSUMER_ID : new HsConsumerId(lastId.id + 1);
   }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import java.util.Objects;
+
+/** This class represents the identifier of hybrid shuffle's consumer. */
+public class HsConsumerId {
+    /**
+     * This consumer id is used in the scenarios that information related to specific consumer needs
+     * to be ignored.
+     */
+    public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1);

Review Comment:
   I'd suggest to name this `ANY`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java:
##########
@@ -393,36 +352,24 @@ private boolean canBeCompressed(Buffer buffer) {
     // subpartitionLock.
     private void addFinishedBuffer(HsBufferContext bufferContext) {
         finishedBufferIndex++;
-        boolean needNotify =
-                callWithLock(
-                        () -> {
-                            allBuffers.add(bufferContext);
-                            unConsumedBuffers.add(bufferContext);
-                            bufferIndexToContexts.put(
-                                    bufferContext.getBufferIndexAndChannel().getBufferIndex(),
-                                    bufferContext);
-                            trimHeadingReleasedBuffers(unConsumedBuffers);
-                            updateStatistics(bufferContext.getBuffer());
-                            return unConsumedBuffers.size() <= 1;
-                        });
-        if (needNotify) {
-            memoryDataManagerOperation.onDataAvailable(targetChannel);
-        }
-    }
-
-    @GuardedBy("subpartitionLock")
-    private DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) {
-        return checkFirstUnConsumedBufferIndex(nextToConsumeIndex)
-                ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType()
-                : DataType.NONE;
-    }
-
-    @GuardedBy("subpartitionLock")
-    private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) {
-        trimHeadingReleasedBuffers(unConsumedBuffers);
-        return !unConsumedBuffers.isEmpty()
-                && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex()
-                        == expectedBufferIndex;
+        Set<HsConsumerId> needNotify = new HashSet<>(consumerMap.size());
+        runWithLock(
+                () -> {
+                    allBuffers.add(bufferContext);
+                    bufferIndexToContexts.put(
+                            bufferContext.getBufferIndexAndChannel().getBufferIndex(),
+                            bufferContext);
+                    for (Map.Entry<HsConsumerId, HsSubpartitionMemoryDataConsumer> consumerEntry :
+                            consumerMap.entrySet()) {
+                        if (consumerEntry.getValue().addBuffer(bufferContext)) {
+                            needNotify.add(consumerEntry.getKey());
+                        }
+                    }
+                    updateStatistics(bufferContext.getBuffer());
+                });
+        needNotify.forEach(
+                (consumerId) ->
+                        memoryDataManagerOperation.onDataAvailable(targetChannel, consumerId));

Review Comment:
   We might change `HsMemoryDataManagerOperation#onDataAvailable` to take a collection of consumer ids.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java:
##########
@@ -153,14 +153,18 @@ public synchronized void run() {
 
     /** This method only called by result partition to create subpartitionFileReader. */
     public HsDataView registerNewSubpartition(
-            int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException {
+            int subpartitionId,

Review Comment:
   We should change this method name to `registerNewConsumer`.
   
   Actually, I think we should change it for all `HsResultPartition`, `HsMemoryDataManager`, `HsSubpartitionMemoryDataManager`, and even rename `HsSubpartitionView` and `HsSubpartitionViewInternalOperations`. We can leave the interface `ResultSubpartitionView` as is, which is shared by different shuffle implementations.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java:
##########
@@ -393,36 +352,24 @@ private boolean canBeCompressed(Buffer buffer) {
     // subpartitionLock.
     private void addFinishedBuffer(HsBufferContext bufferContext) {
         finishedBufferIndex++;
-        boolean needNotify =
-                callWithLock(
-                        () -> {
-                            allBuffers.add(bufferContext);
-                            unConsumedBuffers.add(bufferContext);
-                            bufferIndexToContexts.put(
-                                    bufferContext.getBufferIndexAndChannel().getBufferIndex(),
-                                    bufferContext);
-                            trimHeadingReleasedBuffers(unConsumedBuffers);
-                            updateStatistics(bufferContext.getBuffer());
-                            return unConsumedBuffers.size() <= 1;
-                        });
-        if (needNotify) {
-            memoryDataManagerOperation.onDataAvailable(targetChannel);
-        }
-    }
-
-    @GuardedBy("subpartitionLock")
-    private DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) {
-        return checkFirstUnConsumedBufferIndex(nextToConsumeIndex)
-                ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType()
-                : DataType.NONE;
-    }
-
-    @GuardedBy("subpartitionLock")
-    private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) {
-        trimHeadingReleasedBuffers(unConsumedBuffers);
-        return !unConsumedBuffers.isEmpty()
-                && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex()
-                        == expectedBufferIndex;
+        Set<HsConsumerId> needNotify = new HashSet<>(consumerMap.size());

Review Comment:
   Might be `List`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java:
##########
@@ -53,7 +53,16 @@ public interface HsMemoryDataManagerOperation {
     /**
      * This method is called when subpartition data become available.
      *
-     * @param subpartitionId the subpartition need notify data available.
+     * @param subpartitionId the subpartition's identifier that this consumer belongs to.
+     * @param consumerId the consumer's identifier which need notify data available.
      */
-    void onDataAvailable(int subpartitionId);
+    void onDataAvailable(int subpartitionId, HsConsumerId consumerId);
+
+    /**
+     * This method is called when consumer is decided to released.
+     *
+     * @param subpartitionId the subpartition's identifier that this consumer belongs to.
+     * @param consumerId the consumer's identifier which decided to be released.
+     */
+    void onConsumerRelease(int subpartitionId, HsConsumerId consumerId);

Review Comment:
   `onConsumerReleased`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1294505532

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1294698538

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1005162433


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java:
##########
@@ -153,14 +153,18 @@ public synchronized void run() {
 
     /** This method only called by result partition to create subpartitionFileReader. */
     public HsDataView registerNewSubpartition(
-            int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException {
+            int subpartitionId,

Review Comment:
   Rename `HsSubpartitionView ` to `HsSubpartitionConsumer`, `HsSubpartitionViewInternalOperations ` to `HsSubpartitionConsumerInternalOperations `, `HsSubpartitionMemoryDataConsumer` to `HsSubpartitionConsumerMemoryDataManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1004431443


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import java.util.Objects;
+
+/** This class represents the identifier of hybrid shuffle's consumer. */
+public class HsConsumerId {
+    /**
+     * This consumer id is used in the scenarios that information related to specific consumer needs
+     * to be ignored.
+     */
+    public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1);
+
+    /**
+     * This consumer id is used in the scenarios that only one consumer is allowed for a single
+     * subpartition.
+     */
+    public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0);
+
+    /** This is a unique field for each consumer of a single subpartition. */
+    private final int id;
+
+    public HsConsumerId(int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }

Review Comment:
   changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1296419534

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
xintongsong closed pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization
URL: https://github.com/apache/flink/pull/21122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1284837792

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8ec5ea591f2a80d0bbf1be8ea56b18df82b0c11b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8ec5ea591f2a80d0bbf1be8ea56b18df82b0c11b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8ec5ea591f2a80d0bbf1be8ea56b18df82b0c11b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1002998199


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
-    private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
-            new ConcurrentHashMap<>();
+    private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+            subpartitionViewOperationsMap;

Review Comment:
   Document added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1002986901


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
-    private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap =
-            new ConcurrentHashMap<>();
+    private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+            subpartitionViewOperationsMap;

Review Comment:
   I didn't change the List into an array because I didn't want to introduce a raw type Map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org