You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/27 07:39:27 UTC

[flink] branch master updated (4d6c2df74f4 -> 94a3e2f7580)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 4d6c2df74f4 [hotfix][runtime] Fix the logger of BlocklistDeclarativeSlotPool
     new b3b2a54a15f [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSinkWriter
     new 94a3e2f7580 [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSinkWriter

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/sink/writer/AsyncSinkWriter.java          | 24 +++++++++++-----------
 .../base/sink/writer/AsyncSinkWriterTest.java      | 12 +++++------
 2 files changed, 18 insertions(+), 18 deletions(-)


[flink] 01/02: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSinkWriter

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3b2a54a15f9cbab0a98ece4a5f83ff518dc693f
Author: davidliu <da...@126.com>
AuthorDate: Tue Jul 26 12:48:07 2022 +0800

    [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSinkWriter
---
 .../base/sink/writer/AsyncSinkWriter.java          | 41 +++++++++++-----------
 .../base/sink/writer/AsyncSinkWriterTest.java      | 34 +++++++++---------
 2 files changed, 36 insertions(+), 39 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 090504ab8bc..faab5889ad0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -17,15 +17,6 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.Preconditions;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayDeque;
@@ -36,6 +27,14 @@ import java.util.Deque;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.function.Consumer;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A generic sink writer that handles the general behaviour of a sink such as batching and flushing,
@@ -170,14 +169,14 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * the valid limits of the destination). The logic then needs to create and execute the request
      * asynchronously against the destination (ideally by batching together multiple request entries
      * to increase efficiency). The logic also needs to identify individual request entries that
-     * were not persisted successfully and resubmit them using the {@code requestResult} callback.
+     * were not persisted successfully and resubmit them using the {@code requestToRetry} callback.
      *
      * <p>From a threading perspective, the mailbox thread will call this method and initiate the
      * asynchronous request to persist the {@code requestEntries}. NOTE: The client must support
      * asynchronous requests and the method called to persist the records must asynchronously
      * execute and return a future with the results of that request. A thread from the destination
      * client thread pool should complete the request and submit the failed entries that should be
-     * retried. The {@code requestResult} will then trigger the mailbox thread to requeue the
+     * retried. The {@code requestToRetry} will then trigger the mailbox thread to requeue the
      * unsuccessful elements.
      *
      * <p>An example implementation of this method is included:
@@ -185,15 +184,15 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * <pre>{@code
      * @Override
      * protected void submitRequestEntries
-     *   (List<RequestEntryT> records, Consumer<Collection<RequestEntryT>> requestResult) {
+     *   (List<RequestEntryT> records, Consumer<Collection<RequestEntryT>> requestToRetry) {
      *     Future<Response> response = destinationClient.putRecords(records);
      *     response.whenComplete(
      *         (response, error) -> {
      *             if(error){
      *                 List<RequestEntryT> retryableFailedRecords = getRetryableFailed(response);
-     *                 requestResult.accept(retryableFailedRecords);
+     *                 requestToRetry.accept(retryableFailedRecords);
      *             }else{
-     *                 requestResult.accept(Collections.emptyList());
+     *                 requestToRetry.accept(Collections.emptyList());
      *             }
      *         }
      *     );
@@ -205,14 +204,14 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * requests.
      *
      * @param requestEntries a set of request entries that should be sent to the destination
-     * @param requestResult the {@code accept} method should be called on this Consumer once the
+     * @param requestToRetry the {@code accept} method should be called on this Consumer once the
      *     processing of the {@code requestEntries} are complete. Any entries that encountered
-     *     difficulties in persisting should be re-queued through {@code requestResult} by including
-     *     that element in the collection of {@code RequestEntryT}s passed to the {@code accept}
-     *     method. All other elements are assumed to have been successfully persisted.
+     *     difficulties in persisting should be re-queued through {@code requestToRetry} by
+     *     including that element in the collection of {@code RequestEntryT}s passed to the {@code
+     *     accept} method. All other elements are assumed to have been successfully persisted.
      */
     protected abstract void submitRequestEntries(
-            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestResult);
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry);
 
     /**
      * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
@@ -381,7 +380,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
         }
 
         long timestampOfRequest = System.currentTimeMillis();
-        Consumer<List<RequestEntryT>> requestResult =
+        Consumer<List<RequestEntryT>> requestToRetry =
                 failedRequestEntries ->
                         mailboxExecutor.execute(
                                 () ->
@@ -394,7 +393,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
 
         inFlightRequestsCount++;
         inFlightMessages += batchSize;
-        submitRequestEntries(batch, requestResult);
+        submitRequestEntries(batch, requestToRetry);
     }
 
     /**
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index c8535bfcef1..52f38242953 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,12 +17,11 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -37,12 +36,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-
-import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.fail;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 /**
  * Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
@@ -1059,12 +1057,12 @@ public class AsyncSinkWriterTest {
          * <p>A limitation of this basic implementation is that each element written must be unique.
          *
          * @param requestEntries a set of request entries that should be persisted to {@code res}
-         * @param requestResult a Consumer that needs to accept a collection of failure elements
+         * @param requestToRetry a Consumer that needs to accept a collection of failure elements
          *     once all request entries have been persisted
          */
         @Override
         protected void submitRequestEntries(
-                List<Integer> requestEntries, Consumer<List<Integer>> requestResult) {
+                List<Integer> requestEntries, Consumer<List<Integer>> requestToRetry) {
             maybeDelay();
 
             if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 200)) {
@@ -1087,10 +1085,10 @@ public class AsyncSinkWriterTest {
 
                 requestEntries.removeAll(firstTimeFailed);
                 res.addAll(requestEntries);
-                requestResult.accept(firstTimeFailed);
+                requestToRetry.accept(firstTimeFailed);
             } else {
                 res.addAll(requestEntries);
-                requestResult.accept(new ArrayList<>());
+                requestToRetry.accept(new ArrayList<>());
             }
         }
 
@@ -1239,7 +1237,7 @@ public class AsyncSinkWriterTest {
 
         @Override
         protected void submitRequestEntries(
-                List<Integer> requestEntries, Consumer<List<Integer>> requestResult) {
+                List<Integer> requestEntries, Consumer<List<Integer>> requestToRetry) {
             if (requestEntries.size() == 3) {
                 try {
                     delayedStartLatch.countDown();
@@ -1258,7 +1256,7 @@ public class AsyncSinkWriterTest {
             }
 
             res.addAll(requestEntries);
-            requestResult.accept(new ArrayList<>());
+            requestToRetry.accept(new ArrayList<>());
         }
     }
 }


[flink] 02/02: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSinkWriter

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94a3e2f7580826698ae71472244b32db9717cd37
Author: davidliu <da...@126.com>
AuthorDate: Tue Jul 26 18:58:41 2022 +0800

    [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSinkWriter
---
 .../base/sink/writer/AsyncSinkWriter.java          | 17 +++++++++--------
 .../base/sink/writer/AsyncSinkWriterTest.java      | 22 ++++++++++++----------
 2 files changed, 21 insertions(+), 18 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index faab5889ad0..e579190913b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -17,6 +17,15 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.Preconditions;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayDeque;
@@ -27,14 +36,6 @@ import java.util.Deque;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.function.Consumer;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.Preconditions;
 
 /**
  * A generic sink writer that handles the general behaviour of a sink such as batching and flushing,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 52f38242953..d14d4a36f87 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.fail;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -36,11 +37,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete