You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/16 08:02:16 UTC

[flink] branch master updated (bf40e2dc57e -> 39f0e9bfadf)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from bf40e2dc57e [FLINK-29966][python][doc] Adds PyFlink examples doc
     new 781ca8a4363 [hotfix] Fix an incorrect test case in NetworkBufferTest
     new 133ffab4f19 [FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy
     new 39f0e9bfadf [FLINK-29818] fix the unstable test HsResultPartitionTest.testAvailability

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../partition/hybrid/HsFullSpillingStrategy.java   | 20 ++++++++++-------
 .../io/network/buffer/NetworkBufferTest.java       |  2 +-
 .../hybrid/HsFullSpillingStrategyTest.java         | 25 ++++------------------
 .../partition/hybrid/HsResultPartitionTest.java    | 13 ++++++++---
 .../partition/hybrid/HybridShuffleTestUtils.java   |  4 ----
 5 files changed, 27 insertions(+), 37 deletions(-)


[flink] 02/03: [FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 133ffab4f19b91d5a5fba58684e2a30a6b8b31c7
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Nov 1 10:42:38 2022 +0800

    [FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy
---
 .../partition/hybrid/HsFullSpillingStrategy.java   | 20 ++++++++++-------
 .../hybrid/HsFullSpillingStrategyTest.java         | 25 ++++------------------
 .../partition/hybrid/HybridShuffleTestUtils.java   |  4 ----
 3 files changed, 16 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index 94a067fbaeb..1d0315a0364 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -128,20 +128,21 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
             return;
         }
 
-        int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio);
+        int releaseNum = (int) (poolSize * releaseBufferRatio);
         int numSubpartitions = spillingInfoProvider.getNumSubpartitions();
-        int subpartitionSurvivedNum = survivedNum / numSubpartitions;
-
+        int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions;
         TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new TreeMap<>();
 
         for (int subpartitionId = 0; subpartitionId < numSubpartitions; subpartitionId++) {
             Deque<BufferIndexAndChannel> buffersInOrder =
                     spillingInfoProvider.getBuffersInOrder(
                             subpartitionId, SpillStatus.SPILL, ConsumeStatusWithId.ALL_ANY);
-            // if the number of subpartition buffers less than survived buffers, reserved all of
-            // them.
-            int releaseNum = Math.max(0, buffersInOrder.size() - subpartitionSurvivedNum);
-            while (releaseNum-- != 0) {
+            // if the number of subpartition spilling buffers less than expected release number,
+            // release all of them.
+            int subpartitionReleaseNum =
+                    Math.min(buffersInOrder.size(), expectedSubpartitionReleaseNum);
+            int subpartitionSurvivedNum = buffersInOrder.size() - subpartitionReleaseNum;
+            while (subpartitionSurvivedNum-- != 0) {
                 buffersInOrder.pollLast();
             }
             bufferToRelease.put(subpartitionId, buffersInOrder);
@@ -149,7 +150,10 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy {
 
         // collect results in order
         for (int i = 0; i < numSubpartitions; i++) {
-            builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>()));
+            Deque<BufferIndexAndChannel> bufferIndexAndChannels = bufferToRelease.get(i);
+            if (bufferIndexAndChannels != null && !bufferIndexAndChannels.isEmpty()) {
+                builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>()));
+            }
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index e88918b2c0d..ca4ea9348dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -102,25 +102,10 @@ class HsFullSpillingStrategyTest {
         final int subpartition1 = 0;
         final int subpartition2 = 1;
 
-        final int progress1 = 10;
-        final int progress2 = 20;
-
         List<BufferIndexAndChannel> subpartitionBuffers1 =
-                createBufferIndexAndChannelsList(
-                        subpartition1,
-                        progress1,
-                        progress1 + 2,
-                        progress1 + 4,
-                        progress1 + 6,
-                        progress1 + 8);
+                createBufferIndexAndChannelsList(subpartition1, 1, 2, 3, 4, 5);
         List<BufferIndexAndChannel> subpartitionBuffers2 =
-                createBufferIndexAndChannelsList(
-                        subpartition2,
-                        progress2 + 1,
-                        progress2 + 3,
-                        progress2 + 5,
-                        progress2 + 7,
-                        progress2 + 9);
+                createBufferIndexAndChannelsList(subpartition2, 1, 2, 3, 4, 5);
 
         TestingSpillingInfoProvider spillInfoProvider =
                 TestingSpillingInfoProvider.builder()
@@ -133,8 +118,6 @@ class HsFullSpillingStrategyTest {
                                 () -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO))
                         .setGetNumTotalRequestedBuffersSupplier(() -> 10)
                         .setGetPoolSizeSupplier(() -> 10)
-                        .setGetNextBufferIndexToConsumeSupplier(
-                                () -> Arrays.asList(progress1, progress2))
                         .build();
 
         Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider);
@@ -149,9 +132,9 @@ class HsFullSpillingStrategyTest {
 
         Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>();
         expectedReleaseBuffers.put(
-                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2)));
+                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 3)));
         expectedReleaseBuffers.put(
-                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 3)));
+                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 4)));
         assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index f32e3179563..bd26b6d56a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.metrics.util.TestCounter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -39,9 +38,6 @@ public class HybridShuffleTestUtils {
             int subpartitionId, int... bufferIndexes) {
         List<BufferIndexAndChannel> bufferIndexAndChannels = new ArrayList<>();
         for (int bufferIndex : bufferIndexes) {
-            MemorySegment segment =
-                    MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE);
-            NetworkBuffer buffer = new NetworkBuffer(segment, (ignore) -> {});
             bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId));
         }
         return bufferIndexAndChannels;


[flink] 03/03: [FLINK-29818] fix the unstable test HsResultPartitionTest.testAvailability

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39f0e9bfadf0317328ede1d8c7dd37894beaae38
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Nov 1 10:45:17 2022 +0800

    [FLINK-29818] fix the unstable test HsResultPartitionTest.testAvailability
    
    This closes #21209
---
 .../io/network/partition/hybrid/HsResultPartitionTest.java  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index 8b6210d2b3e..71f777046e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -47,7 +47,6 @@ import org.apache.flink.util.IOUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
@@ -405,12 +404,20 @@ class HsResultPartitionTest {
     }
 
     @Test
-    @Disabled
     void testAvailability() throws Exception {
         final int numBuffers = 2;
+        final int numSubpartitions = 1;
 
         BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
-        HsResultPartition partition = createHsResultPartition(1, bufferPool);
+        HsResultPartition partition =
+                createHsResultPartition(
+                        numSubpartitions,
+                        bufferPool,
+                        HybridShuffleConfiguration.builder(
+                                        numSubpartitions, readBufferPool.getNumBuffersPerRequest())
+                                // Do not return buffer to bufferPool when memory is insufficient.
+                                .setFullStrategyReleaseBufferRatio(0)
+                                .build());
 
         partition.emitRecord(ByteBuffer.allocate(bufferSize * numBuffers), 0);
         assertThat(partition.isAvailable()).isFalse();


[flink] 01/03: [hotfix] Fix an incorrect test case in NetworkBufferTest

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 781ca8a4363d92b93f2c16fe1478aafa40d1c4be
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Nov 1 13:43:25 2022 +0800

    [hotfix] Fix an incorrect test case in NetworkBufferTest
---
 .../org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
index 6030a4f7327..8e796fe0722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
@@ -92,7 +92,7 @@ public class NetworkBufferTest extends AbstractByteBufTest {
 
     @Test
     public void testDataBufferIsBuffer() {
-        assertFalse(newBuffer(1024, 1024, false).isBuffer());
+        assertTrue(newBuffer(1024, 1024, true).isBuffer());
     }
 
     @Test