You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/15 08:29:31 UTC
[flink-statefun] 05/06: [FLINK-19199] [core] Add execution attempt
ID to feedback channel keys
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 7d3b2af409ba71ef8f034c21e9d7f9f820f3f3cb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Sep 11 12:01:33 2020 +0800
[FLINK-19199] [core] Add execution attempt ID to feedback channel keys
This closes #148.
---
.../org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java | 4 ++--
.../flink/statefun/flink/core/feedback/FeedbackSinkOperator.java | 3 ++-
.../flink/statefun/flink/core/feedback/FeedbackUnionOperator.java | 5 +++--
.../flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java | 7 +++++--
.../flink/statefun/flink/core/feedback/FeedbackChannelTest.java | 2 +-
5 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
index 02f1e74..caff5ee 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
@@ -33,8 +33,8 @@ public final class FeedbackKey<V> implements Serializable {
this.invocationId = invocationId;
}
- public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex) {
- return new SubtaskFeedbackKey<>(pipelineName, invocationId, subTaskIndex);
+ public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex, int attemptId) {
+ return new SubtaskFeedbackKey<>(pipelineName, invocationId, attemptId, subTaskIndex);
}
@Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
index d9b6045..4970a51 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
@@ -67,7 +67,8 @@ public final class FeedbackSinkOperator<V> extends AbstractStreamOperator<Void>
public void open() throws Exception {
super.open();
final int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final SubtaskFeedbackKey<V> key = this.key.withSubTaskIndex(indexOfThisSubtask);
+ final int attemptNum = getRuntimeContext().getAttemptNumber();
+ final SubtaskFeedbackKey<V> key = this.key.withSubTaskIndex(indexOfThisSubtask, attemptNum);
FeedbackChannelBroker broker = FeedbackChannelBroker.get();
this.channel = broker.getChannel(key);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
index 6c23ba3..efed323 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
@@ -168,8 +168,9 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
}
private void registerFeedbackConsumer(Executor mailboxExecutor) {
- final SubtaskFeedbackKey<T> key =
- feedbackKey.withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask());
+ final int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final int attemptNum = getRuntimeContext().getAttemptNumber();
+ final SubtaskFeedbackKey<T> key = feedbackKey.withSubTaskIndex(indexOfThisSubtask, attemptNum);
FeedbackChannelBroker broker = FeedbackChannelBroker.get();
FeedbackChannel<T> channel = broker.getChannel(key);
channel.registerConsumer(this, mailboxExecutor);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java
index 1a22c55..a582ea3 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java
@@ -29,11 +29,13 @@ public final class SubtaskFeedbackKey<V> implements Serializable {
private final String pipelineName;
private final int subtaskIndex;
private final long invocationId;
+ private final int attemptId;
- SubtaskFeedbackKey(String pipeline, long invocationId, int subtaskIndex) {
+ SubtaskFeedbackKey(String pipeline, long invocationId, int subtaskIndex, int attemptId) {
this.pipelineName = Objects.requireNonNull(pipeline);
this.invocationId = invocationId;
this.subtaskIndex = subtaskIndex;
+ this.attemptId = attemptId;
}
@Override
@@ -47,11 +49,12 @@ public final class SubtaskFeedbackKey<V> implements Serializable {
SubtaskFeedbackKey<?> that = (SubtaskFeedbackKey<?>) o;
return subtaskIndex == that.subtaskIndex
&& invocationId == that.invocationId
+ && attemptId == that.attemptId
&& Objects.equals(pipelineName, that.pipelineName);
}
@Override
public int hashCode() {
- return Objects.hash(pipelineName, subtaskIndex, invocationId);
+ return Objects.hash(pipelineName, subtaskIndex, invocationId, attemptId);
}
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java
index 996e7fd..fc891ec 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java
@@ -43,7 +43,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
})
public class FeedbackChannelTest {
private static final SubtaskFeedbackKey<String> KEY =
- new FeedbackKey<String>("foo", 1).withSubTaskIndex(0);
+ new FeedbackKey<String>("foo", 1).withSubTaskIndex(0, 1);
@Test
public void exampleUsage() {