You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/01/28 22:12:01 UTC
[beam] branch master updated: [BEAM-10120] Add dynamic timer
support to portable Flink.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b8c8bf3 [BEAM-10120] Add dynamic timer support to portable Flink.
new 73731ec Merge pull request #13783 from [BEAM-10120] Add dynamic timer support to portable Flink
b8c8bf3 is described below
commit b8c8bf3644b449fc009ebe0a2a07cb155bba0989
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Wed Jan 20 19:09:38 2021 -0800
[BEAM-10120] Add dynamic timer support to portable Flink.
---
runners/flink/job-server/flink_job_server.gradle | 1 -
.../translation/wrappers/streaming/DoFnOperator.java | 19 ++++++++++++++++---
.../streaming/ExecutableStageDoFnOperator.java | 7 +++----
.../streaming/ExecutableStageDoFnOperatorTest.java | 4 +++-
.../fnexecution/control/TimerReceiverFactory.java | 1 +
.../translation/PipelineTranslatorUtils.java | 4 ++--
.../runners/portability/flink_runner_test.py | 3 ---
7 files changed, 25 insertions(+), 14 deletions(-)
diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle
index 130dd97..aa68411 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -163,7 +163,6 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
- excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 9c1e437..b463db26 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1412,6 +1412,10 @@ public class DoFnOperator<InputT, OutputT>
return timer.getOutputTimestamp().isBefore(timer.getTimestamp());
}
+ private String constructTimerId(String timerFamilyId, String timerId) {
+ return timerFamilyId + "+" + timerId;
+ }
+
@Override
public void setTimer(
StateNamespace namespace,
@@ -1437,7 +1441,10 @@ public class DoFnOperator<InputT, OutputT>
timer.getTimerId(),
timer.getTimestamp().getMillis(),
timer.getOutputTimestamp().getMillis());
- String contextTimerId = getContextTimerId(timer.getTimerId(), timer.getNamespace());
+ String contextTimerId =
+ getContextTimerId(
+ constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()),
+ timer.getNamespace());
@Nullable final TimerData oldTimer = pendingTimersById.get(contextTimerId);
if (!timer.equals(oldTimer)) {
// Only one timer can exist at a time for a given timer id and context.
@@ -1500,7 +1507,10 @@ public class DoFnOperator<InputT, OutputT>
*/
void onFiredOrDeletedTimer(TimerData timer) {
try {
- pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
+ pendingTimersById.remove(
+ getContextTimerId(
+ constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()),
+ timer.getNamespace()));
if (timer.getDomain() == TimeDomain.EVENT_TIME
|| StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) {
if (timerUsesOutputTimestamp(timer)) {
@@ -1532,7 +1542,10 @@ public class DoFnOperator<InputT, OutputT>
@Override
@Deprecated
public void deleteTimer(TimerData timer) {
- deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getDomain());
+ deleteTimer(
+ timer.getNamespace(),
+ constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()),
+ timer.getDomain());
}
void deleteTimerInternal(TimerData timer) {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index d479a3c..42949c0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -522,8 +522,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
try (Locker locker = Locker.locked(stateBackendLock)) {
getKeyedStateBackend().setCurrentKey(encodedKey);
if (timerElement.getClearBit()) {
- timerInternals.deleteTimer(
- timerData.getNamespace(), timerData.getTimerId(), timerData.getDomain());
+ timerInternals.deleteTimer(timerData);
} else {
timerInternals.setTimer(timerData);
if (!timerData.getTimerId().equals(GC_TIMER_ID)) {
@@ -973,7 +972,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
processElement(stateValue);
} else {
KV<String, String> transformAndTimerFamilyId =
- TimerReceiverFactory.decodeTimerDataTimerId(timerId);
+ TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId);
LOG.debug(
"timer callback: {} {} {} {} {}",
transformAndTimerFamilyId.getKey(),
@@ -990,7 +989,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
Timer<?> timerValue =
Timer.of(
timerKey,
- "",
+ timerId,
Collections.singletonList(window),
timestamp,
outputTimestamp,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 336b414..49039a4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -510,8 +510,8 @@ public class ExecutableStageDoFnOperatorTest {
timestamp,
PaneInfo.NO_FIRING),
TimerInternals.TimerData.of(
- TimerReceiverFactory.encodeToTimerDataTimerId("transform", timerId),
"",
+ TimerReceiverFactory.encodeToTimerDataTimerId("transform", timerId),
StateNamespaces.window(IntervalWindow.getCoder(), intervalWindow),
timestamp,
timestamp,
@@ -799,6 +799,7 @@ public class ExecutableStageDoFnOperatorTest {
// user timer that fires after the end of the window and after state cleanup
TimerInternals.TimerData userTimer =
TimerInternals.TimerData.of(
+ "",
TimerReceiverFactory.encodeToTimerDataTimerId(
timerInputKey.getKey(), timerInputKey.getValue()),
stateNamespace,
@@ -834,6 +835,7 @@ public class ExecutableStageDoFnOperatorTest {
// Cleanup timer are rescheduled if a new timer is created during the bundle
TimerInternals.TimerData userTimer2 =
TimerInternals.TimerData.of(
+ "",
TimerReceiverFactory.encodeToTimerDataTimerId(
timerInputKey.getKey(), timerInputKey.getValue()),
stateNamespace,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
index b71cd3c..e46ef2a 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
@@ -89,6 +89,7 @@ public class TimerReceiverFactory {
StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window);
TimerInternals.TimerData timerData =
TimerInternals.TimerData.of(
+ timer.getDynamicTimerTag(),
encodeToTimerDataTimerId(timerSpec.transformId(), timerSpec.timerId()),
namespace,
timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : timer.getFireTimestamp(),
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 0059cc7..61f21dd 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -151,13 +151,13 @@ public final class PipelineTranslatorUtils {
Timer<?> timerValue =
Timer.of(
currentTimerKey,
- "",
+ timer.getTimerId(),
Collections.singletonList(window),
timestamp,
outputTimestamp,
PaneInfo.NO_FIRING);
KV<String, String> transformAndTimerId =
- TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId());
+ TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerFamilyId());
FnDataReceiver<Timer> fnTimerReceiver = timerReceivers.get(transformAndTimerId);
Preconditions.checkNotNull(
fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index a5f6ac3..b3c3d52 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -395,9 +395,6 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
def test_register_finalizations(self):
raise unittest.SkipTest("BEAM-11021")
- def test_pardo_dynamic_timer(self):
- raise unittest.SkipTest("BEAM-10120")
-
# Inherits all other tests.