You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/15 08:29:31 UTC

[flink-statefun] 05/06: [FLINK-19199] [core] Add execution attempt ID to feedback channel keys

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 7d3b2af409ba71ef8f034c21e9d7f9f820f3f3cb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Sep 11 12:01:33 2020 +0800

    [FLINK-19199] [core] Add execution attempt ID to feedback channel keys
    
    This closes #148.
---
 .../org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java | 4 ++--
 .../flink/statefun/flink/core/feedback/FeedbackSinkOperator.java   | 3 ++-
 .../flink/statefun/flink/core/feedback/FeedbackUnionOperator.java  | 5 +++--
 .../flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java     | 7 +++++--
 .../flink/statefun/flink/core/feedback/FeedbackChannelTest.java    | 2 +-
 5 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
index 02f1e74..caff5ee 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
@@ -33,8 +33,8 @@ public final class FeedbackKey<V> implements Serializable {
     this.invocationId = invocationId;
   }
 
-  public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex) {
-    return new SubtaskFeedbackKey<>(pipelineName, invocationId, subTaskIndex);
+  public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex, int attemptId) {
+    return new SubtaskFeedbackKey<>(pipelineName, invocationId, attemptId, subTaskIndex);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
index d9b6045..4970a51 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.java
@@ -67,7 +67,8 @@ public final class FeedbackSinkOperator<V> extends AbstractStreamOperator<Void>
   public void open() throws Exception {
     super.open();
     final int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
-    final SubtaskFeedbackKey<V> key = this.key.withSubTaskIndex(indexOfThisSubtask);
+    final int attemptNum = getRuntimeContext().getAttemptNumber();
+    final SubtaskFeedbackKey<V> key = this.key.withSubTaskIndex(indexOfThisSubtask, attemptNum);
 
     FeedbackChannelBroker broker = FeedbackChannelBroker.get();
     this.channel = broker.getChannel(key);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
index 6c23ba3..efed323 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
@@ -168,8 +168,9 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
   }
 
   private void registerFeedbackConsumer(Executor mailboxExecutor) {
-    final SubtaskFeedbackKey<T> key =
-        feedbackKey.withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask());
+    final int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    final int attemptNum = getRuntimeContext().getAttemptNumber();
+    final SubtaskFeedbackKey<T> key = feedbackKey.withSubTaskIndex(indexOfThisSubtask, attemptNum);
     FeedbackChannelBroker broker = FeedbackChannelBroker.get();
     FeedbackChannel<T> channel = broker.getChannel(key);
     channel.registerConsumer(this, mailboxExecutor);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java
index 1a22c55..a582ea3 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/SubtaskFeedbackKey.java
@@ -29,11 +29,13 @@ public final class SubtaskFeedbackKey<V> implements Serializable {
   private final String pipelineName;
   private final int subtaskIndex;
   private final long invocationId;
+  private final int attemptId;
 
-  SubtaskFeedbackKey(String pipeline, long invocationId, int subtaskIndex) {
+  SubtaskFeedbackKey(String pipeline, long invocationId, int subtaskIndex, int attemptId) {
     this.pipelineName = Objects.requireNonNull(pipeline);
     this.invocationId = invocationId;
     this.subtaskIndex = subtaskIndex;
+    this.attemptId = attemptId;
   }
 
   @Override
@@ -47,11 +49,12 @@ public final class SubtaskFeedbackKey<V> implements Serializable {
     SubtaskFeedbackKey<?> that = (SubtaskFeedbackKey<?>) o;
     return subtaskIndex == that.subtaskIndex
         && invocationId == that.invocationId
+        && attemptId == that.attemptId
         && Objects.equals(pipelineName, that.pipelineName);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(pipelineName, subtaskIndex, invocationId);
+    return Objects.hash(pipelineName, subtaskIndex, invocationId, attemptId);
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java
index 996e7fd..fc891ec 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/FeedbackChannelTest.java
@@ -43,7 +43,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 })
 public class FeedbackChannelTest {
   private static final SubtaskFeedbackKey<String> KEY =
-      new FeedbackKey<String>("foo", 1).withSubTaskIndex(0);
+      new FeedbackKey<String>("foo", 1).withSubTaskIndex(0, 1);
 
   @Test
   public void exampleUsage() {