You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/06/29 14:04:05 UTC

[flink] branch master updated (5c7716b4c0b -> a63b7dd35bd)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 5c7716b4c0b [FLINK-28043][formats][parquet] Fix "Invalid lambda deserialization" in AvroParquetReaders
     new 4c8e9a4c619 [hotfix] Fix the generic of RecordWriter
     new a63b7dd35bd [FLINK-27789][network] Disable overdraft buffer for LegacySource

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../network/api/writer/MultipleRecordWriters.java  | 13 ++++++++---
 .../io/network/api/writer/NonRecordWriter.java     |  3 +++
 .../io/network/api/writer/RecordWriter.java        |  5 +++++
 .../network/api/writer/RecordWriterDelegate.java   |  3 +++
 .../network/api/writer/ResultPartitionWriter.java  |  3 +++
 .../io/network/api/writer/SingleRecordWriter.java  |  5 +++++
 .../runtime/io/network/buffer/BufferPool.java      |  6 +++++
 .../runtime/io/network/buffer/LocalBufferPool.java | 10 ++++++++-
 .../io/network/partition/ResultPartition.java      |  4 ++++
 .../runtime/io/network/buffer/NoOpBufferPool.java  | 10 +++++++++
 .../io/network/buffer/UnpooledBufferPool.java      |  8 +++++++
 .../partition/MockResultPartitionWriter.java       |  3 +++
 .../streaming/runtime/tasks/SourceStreamTask.java  |  1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/tasks/SourceStreamTaskTest.java        | 26 ++++++++++++++++++++++
 15 files changed, 97 insertions(+), 5 deletions(-)


[flink] 01/02: [hotfix] Fix the generic of RecordWriter

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c8e9a4c61930436b607c1541779a7294aa4e0f7
Author: fanrui <19...@gmail.com>
AuthorDate: Wed Jun 29 11:35:08 2022 +0800

    [hotfix] Fix the generic of RecordWriter
---
 .../flink/runtime/io/network/api/writer/MultipleRecordWriters.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
index 60da2de1775..cfcf702eb3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
@@ -49,7 +49,7 @@ public class MultipleRecordWriters<T extends IOReadableWritable>
     @Override
     public void broadcastEvent(AbstractEvent event) throws IOException {
         IOException exception = null;
-        for (RecordWriter recordWriter : recordWriters) {
+        for (RecordWriter<T> recordWriter : recordWriters) {
             try {
                 recordWriter.broadcastEvent(event);
             } catch (IOException e) {
@@ -80,7 +80,7 @@ public class MultipleRecordWriters<T extends IOReadableWritable>
 
     @Override
     public boolean isAvailable() {
-        for (RecordWriter recordWriter : recordWriters) {
+        for (RecordWriter<T> recordWriter : recordWriters) {
             if (!recordWriter.isAvailable()) {
                 return false;
             }
@@ -90,7 +90,7 @@ public class MultipleRecordWriters<T extends IOReadableWritable>
 
     @Override
     public void close() {
-        for (RecordWriter recordWriter : recordWriters) {
+        for (RecordWriter<T> recordWriter : recordWriters) {
             recordWriter.close();
         }
     }


[flink] 02/02: [FLINK-27789][network] Disable overdraft buffer for LegacySource

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a63b7dd35bdb1aaacd7b622d0c694ee06334759e
Author: fanrui <19...@gmail.com>
AuthorDate: Wed Jun 29 11:35:58 2022 +0800

    [FLINK-27789][network] Disable overdraft buffer for LegacySource
---
 .../network/api/writer/MultipleRecordWriters.java  |  7 ++++++
 .../io/network/api/writer/NonRecordWriter.java     |  3 +++
 .../io/network/api/writer/RecordWriter.java        |  5 +++++
 .../network/api/writer/RecordWriterDelegate.java   |  3 +++
 .../network/api/writer/ResultPartitionWriter.java  |  3 +++
 .../io/network/api/writer/SingleRecordWriter.java  |  5 +++++
 .../runtime/io/network/buffer/BufferPool.java      |  6 +++++
 .../runtime/io/network/buffer/LocalBufferPool.java | 10 ++++++++-
 .../io/network/partition/ResultPartition.java      |  4 ++++
 .../runtime/io/network/buffer/NoOpBufferPool.java  | 10 +++++++++
 .../io/network/buffer/UnpooledBufferPool.java      |  8 +++++++
 .../partition/MockResultPartitionWriter.java       |  3 +++
 .../streaming/runtime/tasks/SourceStreamTask.java  |  1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/tasks/SourceStreamTaskTest.java        | 26 ++++++++++++++++++++++
 15 files changed, 94 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
index cfcf702eb3c..9b0f99e55a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
@@ -70,6 +70,13 @@ public class MultipleRecordWriters<T extends IOReadableWritable>
         return recordWriters.get(outputIndex);
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        for (RecordWriter<T> recordWriter : recordWriters) {
+            recordWriter.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+        }
+    }
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         for (int i = 0; i < futures.length; i++) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java
index 84a58a61b0c..63783a66e89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java
@@ -37,6 +37,9 @@ public class NonRecordWriter<T extends IOReadableWritable> implements RecordWrit
         throw new UnsupportedOperationException("No record writer instance.");
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {}
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         throw new UnsupportedOperationException("No record writer instance.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 9d5f351ef89..57f0a800e37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -217,6 +217,11 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
         }
     }
 
+    /** Sets the max overdraft buffer size of per gate. */
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        targetPartition.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+    }
+
     // ------------------------------------------------------------------------
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java
index f2d99939cd6..4b4790c7db8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java
@@ -45,4 +45,7 @@ public interface RecordWriterDelegate<T extends IOReadableWritable>
      * @param outputIndex the index respective to the record writer instance.
      */
     RecordWriter<T> getRecordWriter(int outputIndex);
+
+    /** Sets the max overdraft buffer size of per gate. */
+    void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 9a505722f6c..80774198c5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -53,6 +53,9 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 
     int getNumTargetKeyGroups();
 
+    /** Sets the max overdraft buffer size of per gate. */
+    void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate);
+
     /** Writes the given serialized record to the target subpartition. */
     void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java
index bd51cb5f1ca..e1cc51ea959 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java
@@ -50,6 +50,11 @@ public class SingleRecordWriter<T extends IOReadableWritable> implements RecordW
         return recordWriter;
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        recordWriter.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+    }
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         return recordWriter.getAvailableFuture();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 8474bf8ae78..c574607e28e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -64,6 +64,12 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
      */
     void setNumBuffers(int numBuffers);
 
+    /** Sets the max overdraft buffer size of per gate. */
+    void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate);
+
+    /** Returns the max overdraft buffer size of per gate. */
+    int getMaxOverdraftBuffersPerGate();
+
     /** Returns the number memory segments, which are currently held by this buffer pool. */
     int getNumberOfAvailableMemorySegments();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a5c54d3ea7a..e5bfc087982 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -119,7 +119,7 @@ class LocalBufferPool implements BufferPool {
     @GuardedBy("availableMemorySegments")
     private int unavailableSubpartitionsCount = 0;
 
-    private final int maxOverdraftBuffersPerGate;
+    private int maxOverdraftBuffersPerGate;
 
     @GuardedBy("availableMemorySegments")
     private int numberOfRequestedOverdraftMemorySegments;
@@ -665,6 +665,14 @@ class LocalBufferPool implements BufferPool {
         mayNotifyAvailable(toNotify);
     }
 
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
+    }
+
+    public int getMaxOverdraftBuffersPerGate() {
+        return maxOverdraftBuffersPerGate;
+    }
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         return availabilityHelper.getAvailableFuture();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 09b9582f0d0..ccdf8f4a138 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -192,6 +192,10 @@ public abstract class ResultPartition implements ResultPartitionWriter {
     /** Returns the number of queued buffers of the given target subpartition. */
     public abstract int getNumberOfQueuedBuffers(int targetSubpartition);
 
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        this.bufferPool.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+    }
+
     /**
      * Returns the type of this result partition.
      *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
index bdd6b4b5276..d4ab33fa053 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -99,6 +99,16 @@ public class NoOpBufferPool implements BufferPool {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getMaxOverdraftBuffersPerGate() {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public int getNumberOfAvailableMemorySegments() {
         throw new UnsupportedOperationException();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
index eb4d01986b8..0ff07afe5f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
@@ -100,6 +100,14 @@ public class UnpooledBufferPool implements BufferPool {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {}
+
+    @Override
+    public int getMaxOverdraftBuffersPerGate() {
+        return 0;
+    }
+
     @Override
     public int getNumberOfAvailableMemorySegments() {
         return Integer.MAX_VALUE;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index 25ee8161ba4..99d87c8c066 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -53,6 +53,9 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
         return 1;
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {}
+
     @Override
     public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 0e5bdd049c0..04cb9dcca65 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -164,6 +164,7 @@ public class SourceStreamTask<
                 .gauge(
                         MetricNames.CHECKPOINT_START_DELAY_TIME,
                         this::getAsyncCheckpointStartDelayNanos);
+        recordWriter.setMaxOverdraftBuffersPerGate(0);
     }
 
     @Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index add4174135d..55a2f527a10 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -272,7 +272,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
     /** Thread pool for async snapshot workers. */
     private final ExecutorService asyncOperationsThreadPool;
 
-    private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
+    protected final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
 
     protected final MailboxProcessor mailboxProcessor;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index d16c5abc2a4..3778a90eb6d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -95,6 +95,8 @@ import static org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpoint
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * These tests verify that the RichFunction methods are called (in correct order). And that
@@ -598,6 +600,30 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         }
     }
 
+    @Test
+    void testDisableOverdraftBuffer() throws Exception {
+        try (NettyShuffleEnvironment env =
+                        new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(2).build();
+                ResultPartition partitionWriter =
+                        PartitionTestUtils.createPartition(
+                                env, ResultPartitionType.PIPELINED_BOUNDED, 1)) {
+            partitionWriter.setup();
+            assertTrue(partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate() > 0);
+
+            final CompletableFuture<Long> checkpointCompleted = new CompletableFuture<>();
+            try (StreamTaskMailboxTestHarness<String> testHarness =
+                    new StreamTaskMailboxTestHarnessBuilder<>(
+                                    SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+                            .addAdditionalOutput(partitionWriter)
+                            .setupOperatorChain(new StreamSource<>(new MockSource(0, 0, 1)))
+                            .finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                            .build()) {
+
+                assertEquals(0, partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate());
+            }
+        }
+    }
+
     @Test
     void testClosedOnRestoreSourceSkipExecution() throws Exception {
         LifeCycleMonitorSource testSource = new LifeCycleMonitorSource();