You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:24 UTC
[flink] 01/06: [FLINK-30755][connector] Support getting attempt number from Sink context
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 833c4d25c884a2657fdf4bbdf4127a44faede7ae
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Sun Jan 22 23:26:51 2023 +0800
[FLINK-30755][connector] Support getting attempt number from Sink context
---
.../flink/connector/base/sink/writer/TestSinkInitContext.java | 5 +++++
.../org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java | 5 +++++
.../src/main/java/org/apache/flink/api/connector/sink2/Sink.java | 7 +++++++
.../flink/streaming/runtime/operators/sink/SinkWriterOperator.java | 5 +++++
.../org/apache/flink/streaming/api/functions/PrintSinkTest.java | 5 +++++
5 files changed, 27 insertions(+)
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index b1461903fc8..3e7d1c159aa 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -122,6 +122,11 @@ public class TestSinkInitContext implements Sink.InitContext {
return 0;
}
+ @Override
+ public int getAttemptNumber() {
+ return 0;
+ }
+
@Override
public SinkWriterMetricGroup metricGroup() {
return metricGroup;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 8dfa5f6e534..98f64ed138f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -474,6 +474,11 @@ public class KafkaWriterITCase {
return 1;
}
+ @Override
+ public int getAttemptNumber() {
+ return 0;
+ }
+
@Override
public SinkWriterMetricGroup metricGroup() {
return metricGroup;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
index c006ba5c12b..58bd1a1dd94 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -95,6 +95,13 @@ public interface Sink<InputT> extends Serializable {
/** @return The number of parallel Sink tasks. */
int getNumberOfParallelSubtasks();
+ /**
+ * Gets the attempt number of this parallel subtask. First attempt is numbered 0.
+ *
+ * @return Attempt number of the subtask.
+ */
+ int getAttemptNumber();
+
/** @return The metric group this writer belongs to. */
SinkWriterMetricGroup metricGroup();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index e593616abf7..7646584e28f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -313,6 +313,11 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
return runtimeContext.getNumberOfParallelSubtasks();
}
+ @Override
+ public int getAttemptNumber() {
+ return runtimeContext.getAttemptNumber();
+ }
+
@Override
public MailboxExecutor getMailboxExecutor() {
return mailboxExecutor;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
index 2a86e20fcdd..69d8e4af968 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
@@ -191,6 +191,11 @@ class PrintSinkTest {
return numSubtasks;
}
+ @Override
+ public int getAttemptNumber() {
+ return 0;
+ }
+
@Override
public SinkWriterMetricGroup metricGroup() {
return InternalSinkWriterMetricGroup.mock(new UnregisteredMetricsGroup());