You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/01/16 08:57:54 UTC

[flink] branch release-1.14 updated (bfc108f -> b1efd3b)

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from bfc108f  [FLINK-25160][docs] Clarified purpose of execution.checkpointing.tolerable-failed-checkpoints
     new 936f1d7  [FLINK-24954][network] Refresh read buffer request timeout on buffer recycling/requesting for sort-shuffle
     new b1efd3b  [hotfix] Fix typing errors for SortMergeResultPartitionReadScheduler#createSubpartitionReader

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/disk/BatchShuffleReadBufferPool.java        |  12 +++
 .../partition/SortMergeResultPartition.java        |   2 +-
 .../SortMergeResultPartitionReadScheduler.java     |  40 +++++--
 .../io/disk/BatchShuffleReadBufferPoolTest.java    |  28 +++++
 .../SortMergeResultPartitionReadSchedulerTest.java | 119 +++++++++++++++++++--
 5 files changed, 184 insertions(+), 17 deletions(-)

[flink] 02/02: [hotfix] Fix typing errors for SortMergeResultPartitionReadScheduler#createSubpartitionReader

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1efd3b0fbcf96cb9368ce925efe5fe67be04902
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Tue Dec 14 16:54:52 2021 +0800

    [hotfix] Fix typing errors for SortMergeResultPartitionReadScheduler#createSubpartitionReader
    
    This closes #17936.
---
 .../io/network/partition/SortMergeResultPartition.java     |  2 +-
 .../partition/SortMergeResultPartitionReadScheduler.java   |  2 +-
 .../SortMergeResultPartitionReadSchedulerTest.java         | 14 +++++++-------
 3 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 650d1de..af27a0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -456,7 +456,7 @@ public class SortMergeResultPartition extends ResultPartition {
             checkState(!isReleased(), "Partition released.");
             checkState(isFinished(), "Trying to read unfinished blocking partition.");
 
-            return readScheduler.crateSubpartitionReader(
+            return readScheduler.createSubpartitionReader(
                     availabilityListener, subpartitionIndex, resultFile);
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 7551dc8..539a43b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -289,7 +289,7 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
         }
     }
 
-    SortMergeSubpartitionReader crateSubpartitionReader(
+    SortMergeSubpartitionReader createSubpartitionReader(
             BufferAvailabilityListener availabilityListener,
             int targetSubpartition,
             PartitionedFile resultFile)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 78ef5fa..4650630 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -105,7 +105,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     @Test
     public void testCreateSubpartitionReader() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
-                readScheduler.crateSubpartitionReader(
+                readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
 
         assertTrue(readScheduler.isRunning());
@@ -128,7 +128,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     @Test
     public void testOnSubpartitionReaderError() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
-                readScheduler.crateSubpartitionReader(
+                readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
 
         subpartitionReader.releaseAllResources();
@@ -139,7 +139,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     @Test
     public void testReleaseWhileReading() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
-                readScheduler.crateSubpartitionReader(
+                readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
 
         Thread.sleep(1000);
@@ -158,7 +158,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     public void testCreateSubpartitionReaderAfterReleased() throws Exception {
         readScheduler.release();
         try {
-            readScheduler.crateSubpartitionReader(
+            readScheduler.createSubpartitionReader(
                     new NoOpBufferAvailablityListener(), 0, partitionedFile);
         } finally {
             assertAllResourcesReleased();
@@ -168,7 +168,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     @Test
     public void testOnDataReadError() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
-                readScheduler.crateSubpartitionReader(
+                readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
 
         // close file channel to trigger data read exception
@@ -191,7 +191,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     @Test
     public void testOnReadBufferRequestError() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
-                readScheduler.crateSubpartitionReader(
+                readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
 
         bufferPool.destroy();
@@ -212,7 +212,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
                         bufferPool, executor, this, bufferRequestTimeout);
 
         SortMergeSubpartitionReader subpartitionReader =
-                readScheduler.crateSubpartitionReader(
+                readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
 
         PriorityQueue<SortMergeSubpartitionReader> allReaders = new PriorityQueue<>();

[flink] 01/02: [FLINK-24954][network] Refresh read buffer request timeout on buffer recycling/requesting for sort-shuffle

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 936f1d76b3cf494cc98c0c81bb5acfe5b438d93d
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Fri Nov 26 16:46:55 2021 +0800

    [FLINK-24954][network] Refresh read buffer request timeout on buffer recycling/requesting for sort-shuffle
    
    The implementation of read buffer request timeout for sorting shuffle is a little aggressive. When a running task encounters data skew or the task is slow, a timeout exception may be triggered. To improve this situation, when at least one buffer is recycled or allocated, the buffer request timeout should be refreshed to avoid throwing a timeout exception.
    
    This closes #17936.
---
 .../io/disk/BatchShuffleReadBufferPool.java        |  12 +++
 .../SortMergeResultPartitionReadScheduler.java     |  38 ++++++--
 .../io/disk/BatchShuffleReadBufferPoolTest.java    |  28 ++++++
 .../SortMergeResultPartitionReadSchedulerTest.java | 107 +++++++++++++++++++++
 4 files changed, 176 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index 1000a44..7b69936 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -75,6 +75,10 @@ public class BatchShuffleReadBufferPool {
     @GuardedBy("buffers")
     private final Queue<MemorySegment> buffers = new ArrayDeque<>();
 
+    /** The timestamp when the last buffer is recycled or allocated. */
+    @GuardedBy("buffers")
+    private long lastBufferOperationTimestamp = System.nanoTime();
+
     /** Whether this buffer pool has been destroyed or not. */
     @GuardedBy("buffers")
     private boolean destroyed;
@@ -203,6 +207,7 @@ public class BatchShuffleReadBufferPool {
             while (allocated.size() < numBuffersPerRequest) {
                 allocated.add(buffers.poll());
             }
+            lastBufferOperationTimestamp = System.nanoTime();
         }
         return allocated;
     }
@@ -236,12 +241,19 @@ public class BatchShuffleReadBufferPool {
             }
 
             buffers.addAll(segments);
+            lastBufferOperationTimestamp = System.nanoTime();
             if (buffers.size() >= numBuffersPerRequest) {
                 buffers.notifyAll();
             }
         }
     }
 
+    public long getLastBufferOperationTimestamp() {
+        synchronized (buffers) {
+            return lastBufferOperationTimestamp;
+        }
+    }
+
     /** Destroys this buffer pool and after which, no buffer can be allocated any more. */
     public void destroy() {
         synchronized (buffers) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index a55a0c0..7551dc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
@@ -65,10 +64,10 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
             LoggerFactory.getLogger(SortMergeResultPartitionReadScheduler.class);
 
     /**
-     * Maximum time (5min) to wait when requesting read buffers from the buffer pool before throwing
-     * an exception.
+     * Default maximum time (5min) to wait when requesting read buffers from the buffer pool before
+     * throwing an exception.
      */
-    private static final Duration BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5);
+    private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5);
 
     /** Lock used to synchronize multi-thread access to thread-unsafe fields. */
     private final Object lock;
@@ -88,6 +87,12 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
     /** Maximum number of buffers can be allocated by this partition reader. */
     private final int maxRequestedBuffers;
 
+    /**
+     * Maximum time to wait when requesting read buffers from the buffer pool before throwing an
+     * exception.
+     */
+    private final Duration bufferRequestTimeout;
+
     /** All failed subpartition readers to be released. */
     @GuardedBy("lock")
     private final Set<SortMergeSubpartitionReader> failedReaders = new HashSet<>();
@@ -121,12 +126,22 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
 
     SortMergeResultPartitionReadScheduler(
             BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object lock) {
+        this(bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
+    }
+
+    SortMergeResultPartitionReadScheduler(
+            BatchShuffleReadBufferPool bufferPool,
+            Executor ioExecutor,
+            Object lock,
+            Duration bufferRequestTimeout) {
+
         this.lock = checkNotNull(lock);
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
         // one partition reader can consume at most 32M (the expected buffers per request is 8M)
         // buffers for data read. Currently, it is only an empirical value can not be configured
         this.maxRequestedBuffers = Math.max(1, 4 * bufferPool.getNumBuffersPerRequest());
+        this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
 
         // initialize the buffer pool eagerly to avoid reporting errors like OOM too late
         bufferPool.initialize();
@@ -147,21 +162,22 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
         removeFinishedAndFailedReaders(numBuffersRead, finishedReaders);
     }
 
-    private Queue<MemorySegment> allocateBuffers(
-            Queue<SortMergeSubpartitionReader> availableReaders) {
+    @VisibleForTesting
+    Queue<MemorySegment> allocateBuffers(Queue<SortMergeSubpartitionReader> availableReaders) {
         if (availableReaders.isEmpty()) {
             return new ArrayDeque<>();
         }
 
         try {
-            Deadline deadline = Deadline.fromNow(BUFFER_REQUEST_TIMEOUT);
-            while (deadline.hasTimeLeft()) {
+            long timeoutTime = getBufferRequestTimeoutTime();
+            do {
                 List<MemorySegment> buffers = bufferPool.requestBuffers();
                 if (!buffers.isEmpty()) {
                     return new ArrayDeque<>(buffers);
                 }
                 checkState(!isReleased, "Result partition has been already released.");
-            }
+            } while (System.nanoTime() < timeoutTime
+                    || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime()));
 
             if (numRequestedBuffers <= 0) {
                 throw new TimeoutException(
@@ -178,6 +194,10 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
         return new ArrayDeque<>();
     }
 
+    private long getBufferRequestTimeoutTime() {
+        return bufferPool.getLastBufferOperationTimestamp() + bufferRequestTimeout.toNanos();
+    }
+
     private void releaseBuffers(Queue<MemorySegment> buffers) {
         if (!buffers.isEmpty()) {
             try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
index 52823f0..6a3654b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
@@ -99,6 +99,34 @@ public class BatchShuffleReadBufferPoolTest {
     }
 
     @Test
+    public void testBufferOperationTimestampUpdated() throws Exception {
+        BatchShuffleReadBufferPool bufferPool = new BatchShuffleReadBufferPool(1024, 1024);
+        long oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
+        Thread.sleep(100);
+        List<MemorySegment> buffers = bufferPool.requestBuffers();
+        assertEquals(1, buffers.size());
+        // The timestamp is updated when requesting buffers successfully
+        assertTrue(bufferPool.getLastBufferOperationTimestamp() > oldTimestamp);
+
+        oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
+        Thread.sleep(100);
+        bufferPool.recycle(buffers);
+        // The timestamp is updated when recycling buffers
+        assertTrue(bufferPool.getLastBufferOperationTimestamp() > oldTimestamp);
+
+        buffers = bufferPool.requestBuffers();
+
+        oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
+        Thread.sleep(100);
+        assertEquals(0, bufferPool.requestBuffers().size());
+        // The timestamp is not updated when requesting buffers is failed
+        assertEquals(oldTimestamp, bufferPool.getLastBufferOperationTimestamp());
+
+        bufferPool.recycle(buffers);
+        bufferPool.destroy();
+    }
+
+    @Test
     public void testBufferFulfilledByRecycledBuffers() throws Exception {
         int numRequestThreads = 2;
         AtomicReference<Throwable> exception = new AtomicReference<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index ebdaf0b..78ef5fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -29,7 +31,16 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -192,6 +203,102 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testRequestBufferTimeoutAndFailed() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+        List<MemorySegment> buffers = bufferPool.requestBuffers();
+        SortMergeResultPartitionReadScheduler readScheduler =
+                new SortMergeResultPartitionReadScheduler(
+                        bufferPool, executor, this, bufferRequestTimeout);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.crateSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, partitionedFile);
+
+        PriorityQueue<SortMergeSubpartitionReader> allReaders = new PriorityQueue<>();
+        allReaders.add(subpartitionReader);
+
+        long startTimestamp = System.nanoTime();
+        Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers(allReaders);
+        long requestDuration = System.nanoTime() - startTimestamp;
+
+        assertEquals(0, allocatedBuffers.size());
+        assertTrue(requestDuration > bufferRequestTimeout.toNanos());
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+
+        bufferPool.recycle(buffers);
+        readScheduler.release();
+    }
+
+    @Test
+    public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+        FakeBatchShuffleReadBufferPool bufferPool =
+                new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize);
+        SortMergeResultPartitionReadScheduler readScheduler =
+                new SortMergeResultPartitionReadScheduler(
+                        bufferPool, executor, this, bufferRequestTimeout);
+
+        FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
+        FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
+
+        PartitionedFileReader fileReader =
+                new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);
+
+        PriorityQueue<SortMergeSubpartitionReader> allReaders = new PriorityQueue<>();
+        allReaders.add(subpartitionReader);
+
+        long startTimestamp = System.nanoTime();
+        Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers(allReaders);
+        long requestDuration = System.nanoTime() - startTimestamp;
+
+        assertEquals(3, allocatedBuffers.size());
+        assertTrue(requestDuration > bufferRequestTimeout.toNanos() * 2);
+        assertNull(subpartitionReader.getFailureCause());
+
+        bufferPool.recycle(allocatedBuffers);
+        bufferPool.destroy();
+        dataFileChannel.close();
+        indexFileChannel.close();
+        readScheduler.release();
+    }
+
+    private static class FakeBatchShuffleReadBufferPool extends BatchShuffleReadBufferPool {
+        private final Queue<MemorySegment> requestedBuffers;
+
+        FakeBatchShuffleReadBufferPool(long totalBytes, int bufferSize) throws Exception {
+            super(totalBytes, bufferSize);
+            this.requestedBuffers = new LinkedList<>(requestBuffers());
+        }
+
+        @Override
+        public long getLastBufferOperationTimestamp() {
+            recycle(requestedBuffers.poll());
+            return super.getLastBufferOperationTimestamp();
+        }
+
+        @Override
+        public void destroy() {
+            recycle(requestedBuffers);
+            requestedBuffers.clear();
+            super.destroy();
+        }
+    }
+
+    private static FileChannel openFileChannel(Path path) throws IOException {
+        return FileChannel.open(path, StandardOpenOption.READ);
+    }
+
+    private static void assertExpectedTimeoutException(Throwable throwable) {
+        assertNotNull(throwable);
+        assertTrue(
+                ExceptionUtils.findThrowableWithMessage(throwable, "Buffer request timeout")
+                        .isPresent());
+    }
+
     private void assertAllResourcesReleased() {
         assertNull(readScheduler.getDataFileChannel());
         assertNull(readScheduler.getIndexFileChannel());