You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/01/28 22:12:01 UTC

[beam] branch master updated: [BEAM-10120] Add dynamic timer support to portable Flink.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b8c8bf3  [BEAM-10120] Add dynamic timer support to portable Flink.
     new 73731ec  Merge pull request #13783 from [BEAM-10120] Add dynamic timer support to portable Flink
b8c8bf3 is described below

commit b8c8bf3644b449fc009ebe0a2a07cb155bba0989
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Wed Jan 20 19:09:38 2021 -0800

    [BEAM-10120] Add dynamic timer support to portable Flink.
---
 runners/flink/job-server/flink_job_server.gradle      |  1 -
 .../translation/wrappers/streaming/DoFnOperator.java  | 19 ++++++++++++++++---
 .../streaming/ExecutableStageDoFnOperator.java        |  7 +++----
 .../streaming/ExecutableStageDoFnOperatorTest.java    |  4 +++-
 .../fnexecution/control/TimerReceiverFactory.java     |  1 +
 .../translation/PipelineTranslatorUtils.java          |  4 ++--
 .../runners/portability/flink_runner_test.py          |  3 ---
 7 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle
index 130dd97..aa68411 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -163,7 +163,6 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
         excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
         excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
         excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
         excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
         excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 9c1e437..b463db26 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1412,6 +1412,10 @@ public class DoFnOperator<InputT, OutputT>
       return timer.getOutputTimestamp().isBefore(timer.getTimestamp());
     }
 
+    private String constructTimerId(String timerFamilyId, String timerId) {
+      return timerFamilyId + "+" + timerId;
+    }
+
     @Override
     public void setTimer(
         StateNamespace namespace,
@@ -1437,7 +1441,10 @@ public class DoFnOperator<InputT, OutputT>
             timer.getTimerId(),
             timer.getTimestamp().getMillis(),
             timer.getOutputTimestamp().getMillis());
-        String contextTimerId = getContextTimerId(timer.getTimerId(), timer.getNamespace());
+        String contextTimerId =
+            getContextTimerId(
+                constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()),
+                timer.getNamespace());
         @Nullable final TimerData oldTimer = pendingTimersById.get(contextTimerId);
         if (!timer.equals(oldTimer)) {
           // Only one timer can exist at a time for a given timer id and context.
@@ -1500,7 +1507,10 @@ public class DoFnOperator<InputT, OutputT>
      */
     void onFiredOrDeletedTimer(TimerData timer) {
       try {
-        pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
+        pendingTimersById.remove(
+            getContextTimerId(
+                constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()),
+                timer.getNamespace()));
         if (timer.getDomain() == TimeDomain.EVENT_TIME
             || StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) {
           if (timerUsesOutputTimestamp(timer)) {
@@ -1532,7 +1542,10 @@ public class DoFnOperator<InputT, OutputT>
     @Override
     @Deprecated
     public void deleteTimer(TimerData timer) {
-      deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getDomain());
+      deleteTimer(
+          timer.getNamespace(),
+          constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()),
+          timer.getDomain());
     }
 
     void deleteTimerInternal(TimerData timer) {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index d479a3c..42949c0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -522,8 +522,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
       try (Locker locker = Locker.locked(stateBackendLock)) {
         getKeyedStateBackend().setCurrentKey(encodedKey);
         if (timerElement.getClearBit()) {
-          timerInternals.deleteTimer(
-              timerData.getNamespace(), timerData.getTimerId(), timerData.getDomain());
+          timerInternals.deleteTimer(timerData);
         } else {
           timerInternals.setTimer(timerData);
           if (!timerData.getTimerId().equals(GC_TIMER_ID)) {
@@ -973,7 +972,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
         processElement(stateValue);
       } else {
         KV<String, String> transformAndTimerFamilyId =
-            TimerReceiverFactory.decodeTimerDataTimerId(timerId);
+            TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId);
         LOG.debug(
             "timer callback: {} {} {} {} {}",
             transformAndTimerFamilyId.getKey(),
@@ -990,7 +989,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
         Timer<?> timerValue =
             Timer.of(
                 timerKey,
-                "",
+                timerId,
                 Collections.singletonList(window),
                 timestamp,
                 outputTimestamp,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 336b414..49039a4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -510,8 +510,8 @@ public class ExecutableStageDoFnOperatorTest {
                     timestamp,
                     PaneInfo.NO_FIRING),
                 TimerInternals.TimerData.of(
-                    TimerReceiverFactory.encodeToTimerDataTimerId("transform", timerId),
                     "",
+                    TimerReceiverFactory.encodeToTimerDataTimerId("transform", timerId),
                     StateNamespaces.window(IntervalWindow.getCoder(), intervalWindow),
                     timestamp,
                     timestamp,
@@ -799,6 +799,7 @@ public class ExecutableStageDoFnOperatorTest {
     // user timer that fires after the end of the window and after state cleanup
     TimerInternals.TimerData userTimer =
         TimerInternals.TimerData.of(
+            "",
             TimerReceiverFactory.encodeToTimerDataTimerId(
                 timerInputKey.getKey(), timerInputKey.getValue()),
             stateNamespace,
@@ -834,6 +835,7 @@ public class ExecutableStageDoFnOperatorTest {
     // Cleanup timer are rescheduled if a new timer is created during the bundle
     TimerInternals.TimerData userTimer2 =
         TimerInternals.TimerData.of(
+            "",
             TimerReceiverFactory.encodeToTimerDataTimerId(
                 timerInputKey.getKey(), timerInputKey.getValue()),
             stateNamespace,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
index b71cd3c..e46ef2a 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
@@ -89,6 +89,7 @@ public class TimerReceiverFactory {
         StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window);
         TimerInternals.TimerData timerData =
             TimerInternals.TimerData.of(
+                timer.getDynamicTimerTag(),
                 encodeToTimerDataTimerId(timerSpec.transformId(), timerSpec.timerId()),
                 namespace,
                 timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : timer.getFireTimestamp(),
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 0059cc7..61f21dd 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -151,13 +151,13 @@ public final class PipelineTranslatorUtils {
     Timer<?> timerValue =
         Timer.of(
             currentTimerKey,
-            "",
+            timer.getTimerId(),
             Collections.singletonList(window),
             timestamp,
             outputTimestamp,
             PaneInfo.NO_FIRING);
     KV<String, String> transformAndTimerId =
-        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId());
+        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerFamilyId());
     FnDataReceiver<Timer> fnTimerReceiver = timerReceivers.get(transformAndTimerId);
     Preconditions.checkNotNull(
         fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index a5f6ac3..b3c3d52 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -395,9 +395,6 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
   def test_register_finalizations(self):
     raise unittest.SkipTest("BEAM-11021")
 
-  def test_pardo_dynamic_timer(self):
-    raise unittest.SkipTest("BEAM-10120")
-
   # Inherits all other tests.