You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:24 UTC

[flink] 01/06: [FLINK-30755][connector] Support getting attempt number from Sink context

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 833c4d25c884a2657fdf4bbdf4127a44faede7ae
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Sun Jan 22 23:26:51 2023 +0800

    [FLINK-30755][connector] Support getting attempt number from Sink context
---
 .../flink/connector/base/sink/writer/TestSinkInitContext.java      | 5 +++++
 .../org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java   | 5 +++++
 .../src/main/java/org/apache/flink/api/connector/sink2/Sink.java   | 7 +++++++
 .../flink/streaming/runtime/operators/sink/SinkWriterOperator.java | 5 +++++
 .../org/apache/flink/streaming/api/functions/PrintSinkTest.java    | 5 +++++
 5 files changed, 27 insertions(+)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index b1461903fc8..3e7d1c159aa 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -122,6 +122,11 @@ public class TestSinkInitContext implements Sink.InitContext {
         return 0;
     }
 
+    @Override
+    public int getAttemptNumber() {
+        return 0;
+    }
+
     @Override
     public SinkWriterMetricGroup metricGroup() {
         return metricGroup;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 8dfa5f6e534..98f64ed138f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -474,6 +474,11 @@ public class KafkaWriterITCase {
             return 1;
         }
 
+        @Override
+        public int getAttemptNumber() {
+            return 0;
+        }
+
         @Override
         public SinkWriterMetricGroup metricGroup() {
             return metricGroup;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
index c006ba5c12b..58bd1a1dd94 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -95,6 +95,13 @@ public interface Sink<InputT> extends Serializable {
         /** @return The number of parallel Sink tasks. */
         int getNumberOfParallelSubtasks();
 
+        /**
+         * Gets the attempt number of this parallel subtask. First attempt is numbered 0.
+         *
+         * @return Attempt number of the subtask.
+         */
+        int getAttemptNumber();
+
         /** @return The metric group this writer belongs to. */
         SinkWriterMetricGroup metricGroup();
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index e593616abf7..7646584e28f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -313,6 +313,11 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
             return runtimeContext.getNumberOfParallelSubtasks();
         }
 
+        @Override
+        public int getAttemptNumber() {
+            return runtimeContext.getAttemptNumber();
+        }
+
         @Override
         public MailboxExecutor getMailboxExecutor() {
             return mailboxExecutor;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
index 2a86e20fcdd..69d8e4af968 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
@@ -191,6 +191,11 @@ class PrintSinkTest {
             return numSubtasks;
         }
 
+        @Override
+        public int getAttemptNumber() {
+            return 0;
+        }
+
         @Override
         public SinkWriterMetricGroup metricGroup() {
             return InternalSinkWriterMetricGroup.mock(new UnregisteredMetricsGroup());