You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/04/03 16:33:43 UTC
[beam] branch master updated: Update Timer encoding
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 151b2f8 Update Timer encoding
new fff532e Merge pull request #11199 from boyuanzz/timer_encoding
151b2f8 is described below
commit 151b2f80997810ed9311124d2831b70a51d1c07a
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Mar 23 13:00:43 2020 -0700
Update Timer encoding
---
.../fn-execution/src/main/proto/beam_fn_api.proto | 9 +-
.../beam/model/fnexecution/v1/standard_coders.yaml | 42 ++++-
.../pipeline/src/main/proto/beam_runner_api.proto | 16 +-
.../core/construction/CoderTranslators.java | 4 +-
.../core/construction/ParDoTranslation.java | 13 +-
.../beam/runners/core/construction/Timer.java | 196 +++++++++++++++++----
.../core/construction/CoderTranslationTest.java | 2 +-
.../runners/core/construction/CommonCoderTest.java | 33 +++-
.../core/construction/ParDoTranslationTest.java | 6 +-
.../beam/runners/core/construction/TimerTest.java | 142 ++++++++++++---
.../streaming/ExecutableStageDoFnOperator.java | 13 +-
.../dataflow/worker/fn/control/TimerReceiver.java | 16 +-
.../worker/fn/control/TimerReceiverTest.java | 10 +-
.../fnexecution/control/TimerReceiverFactory.java | 2 +-
.../translation/PipelineTranslatorUtils.java | 11 +-
.../fnexecution/control/RemoteExecutionTest.java | 22 ++-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 13 +-
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 11 +-
sdks/python/apache_beam/coders/coder_impl.py | 49 +++++-
sdks/python/apache_beam/coders/coders.py | 20 ++-
.../apache_beam/coders/coders_test_common.py | 33 ++--
.../apache_beam/coders/standard_coders_test.py | 25 ++-
.../apache_beam/runners/worker/bundle_processor.py | 24 ++-
.../apache_beam/runners/worker/operations.py | 2 +-
sdks/python/apache_beam/transforms/userstate.py | 16 +-
25 files changed, 577 insertions(+), 153 deletions(-)
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 6590b88..68f2ceb 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -609,12 +609,13 @@ message Elements {
// represents the producer of these timers.
string transform_id = 2;
- // (Optional) The local timer name used to identify the associated timer specification.
- string timer_id = 3;
+ // (Optional) The local timer family name used to identify the associated
+ // timer family specification
+ string timer_family_id = 3;
- // (Optional) Represents a logical byte stream of a timer. Encoded according
+ // (Optional) Represents a logical byte stream of timers. Encoded according
// to the coder in the timer spec.
- bytes timer = 4;
+ bytes timers = 4;
// (Optional) Set this bit to indicate the this is the last data block
// for the given instruction and transform, ending the stream.
diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index c9a4288..7afa00b 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -28,6 +28,23 @@
# one of a few standard JSON types such as numbers, strings, dicts that map naturally
# to the type encoded by the coder.
#
+# Java code snippet to generate example bytes:
+# Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+# Instant now = new Instant(1000L);
+# Timer<String> timer = Timer.of(
+# "key",
+# "tag",
+# Collections.singletonList(GlobalWindow.INSTANCE),
+# now,
+# now,
+# PaneInfo.NO_FIRING);
+# byte[] bytes = CoderUtils.encodeToByteArray(coder, timer);
+# String str = new String(bytes, java.nio.charset.StandardCharsets.ISO_8859_1);
+# String example = "";
+# for(int i = 0; i < str.length(); i++){
+# example += CharUtils.unicodeEscaped(str.charAt(i));
+# }
+#
# These choices were made to strike a balance between portability, ease of use, and simple
# legibility of this file itself.
#
@@ -207,15 +224,24 @@ examples:
coder:
urn: "beam:coder:timer:v1"
- components: [{urn: "beam:coder:bytes:v1"}]
+ components: [{urn: "beam:coder:string_utf8:v1"},
+ {urn: "beam:coder:global_window:v1"}]
examples:
- "\0\0\0\0\0\0\0\0\u0003abc": {timestamp: -9223372036854775808, payload: abc}
- "\x7fÿÿÿÿÿÿ\x01\u0003abc": {timestamp: -255, payload: abc}
- "\x7fÿÿÿÿÿÿÿ\u0003abc": {timestamp: -1, payload: abc}
- "\x80\0\0\0\0\0\0\0\u0003abc": {timestamp: 0, payload: abc}
- "\x80\0\0\0\0\0\0\x01\u0003abc": {timestamp: 1, payload: abc}
- "\x80\0\0\0\0\0\x01\0\u0003abc": {timestamp: 256, payload: abc}
- "ÿÿÿÿÿÿÿÿ\u0003abc": {timestamp: 9223372036854775807, payload: abc}
+ "\u0003\u006b\u0065\u0079\u0003\u0074\u0061\u0067\u0000\u0000\u0000\u0001\u0000\u0080\u0000\u0000\u0000\u0000\u0000\u0004\u00d2\u0080\u0000\u0000\u0000\u0000\u0000\u0016\u002e\u000f": {
+ userKey: key,
+ dynamicTimerTag: tag,
+ windows: ["global"],
+ clearBit: False,
+ fireTimestamp: 1234,
+ holdTimestamp: 5678,
+ pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+ }
+ "\u0003\u006b\u0065\u0079\u0003\u0074\u0061\u0067\u0000\u0000\u0000\u0001\u0001": {
+ userKey: key,
+ dynamicTimerTag: tag,
+ windows: ["global"],
+ clearBit: True,
+ }
---
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 17229d2..7d63501 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -693,9 +693,14 @@ message StandardCoders {
// Components: Coder for a single element.
ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];
- // Encodes a timer containing a timestamp and a user specified payload.
- // The encoding is represented as: timestamp payload
- // timestamp - a big endian 8 byte integer representing millis-since-epoch.
+ // Encodes a timer containing a user key, a dynamic timer tag, a clear bit,
+ // a fire timestamp, a hold timestamp, the windows and the paneinfo.
+ // The encoding is represented as:
+ // user key - user defined key, uses the component coder.
+ // dynamic timer tag - a string which identifies a timer.
+ // windows - uses component coders.
+ // clear bit - a boolean set for clearing the timer.
+ // fire timestamp - a big endian 8 byte integer representing millis-since-epoch.
// The encoded representation is shifted so that the byte representation of
// negative values are lexicographically ordered before the byte representation
// of positive values. This is typically done by subtracting -9223372036854775808
@@ -708,8 +713,9 @@ message StandardCoders {
// 1: 80 00 00 00 00 00 00 01
// 256: 80 00 00 00 00 00 01 00
// 9223372036854775807: FF FF FF FF FF FF FF FF
- // payload - user defined data, uses the component coder
- // Components: Coder for the payload.
+ // hold timestamp - similar to the fire timestamp.
+ // paneinfo - similar to the paneinfo of the windowed_value.
+ // Components: Coder for the key and windows.
TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];
/*
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
index d7abf4e..ae7e9dc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
@@ -86,12 +86,12 @@ class CoderTranslators {
return new SimpleStructuredCoderTranslator<Timer.Coder<?>>() {
@Override
public List<? extends Coder<?>> getComponents(Timer.Coder<?> from) {
- return from.getCoderArguments();
+ return from.getComponents();
}
@Override
public Timer.Coder<?> fromComponents(List<Coder<?>> components) {
- return Timer.Coder.of(components.get(0));
+ return Timer.Coder.of(components.get(0), (Coder<BoundedWindow>) components.get(1));
}
};
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 969ae25..d74c4e4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -50,7 +50,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation.Transform
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
@@ -70,6 +70,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.DoFnWithExecutionInformation;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -195,9 +196,7 @@ public class ParDoTranslation {
mainInput.isBounded(),
KvCoder.of(
((KvCoder) mainInput.getCoder()).getKeyCoder(),
- // TODO: Add support for timer payloads to the SDK
- // We currently assume that all payloads are unspecified.
- Timer.Coder.of(VoidCoder.of())));
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)));
timerPCollection.setName(
String.format("%s.%s", appliedPTransform.getFullName(), localTimerName));
String timerPCollectionId = components.registerPCollection(timerPCollection);
@@ -656,12 +655,14 @@ public class ParDoTranslation {
throw new RuntimeException("Failure to register coder", exc);
}
}
-
+ // TODO(BEAM-9562): Plumb through actual keyCoder and windowCoder.
public static RunnerApi.TimerFamilySpec translateTimerFamilySpec(
TimerSpec timer, SdkComponents components) {
return RunnerApi.TimerFamilySpec.newBuilder()
.setTimeDomain(translateTimeDomain(timer.getTimeDomain()))
- .setTimerFamilyCoderId(registerCoderOrThrow(components, Timer.Coder.of(VoidCoder.of())))
+ .setTimerFamilyCoderId(
+ registerCoderOrThrow(
+ components, Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)))
.build();
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
index 66afe7c..158d471 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
@@ -21,17 +21,29 @@ import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.joda.time.Instant;
/**
- * A timer consists of a timestamp and a corresponding user supplied payload.
+ * A timer consists of a user key, a dynamic timer tag, a set of windows and either a bit that says
+ * that this timer should be cleared or data representing the firing timestamp, hold timestamp and
+ * pane information that should be used when producing output.
*
* <p>Note that this is an implementation helper specifically intended for use during execution by
* runners and the Java SDK harness. The API for pipeline authors is {@link
@@ -40,92 +52,198 @@ import org.joda.time.Instant;
@AutoValue
public abstract class Timer<T> {
- /** Returns a timer for the given timestamp with a {@code null} payload. */
- public static Timer<Void> of(Instant time) {
- return of(time, (Void) null);
+ /**
+ * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+ * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+ */
+ public static <T> Timer<T> of(
+ T userKey,
+ String dynamicTimerTag,
+ Collection<? extends BoundedWindow> windows,
+ Instant fireTimestamp,
+ Instant holdTimestamp,
+ PaneInfo pane) {
+ return new AutoValue_Timer(
+ userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
}
- /** Returns a timer for the given timestamp with a user specified payload. */
- public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
- return new AutoValue_Timer(timestamp, payload);
+ /**
+ * Returns a cleared timer for the given {@code userKey}, {@code dynamicTimerTag} and {@code
+ * windows}.
+ */
+ public static <T> Timer<T> cleared(
+ T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+ return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
}
+ /** Returns the key that the timer is set on. */
+ public abstract T getUserKey();
+
/**
- * Returns the timestamp of when the timer is scheduled to fire.
+ * Returns the tag that the timer is set on. The tag is {@code ""} when the timer is for a {@link
+ * TimerSpec}.
+ */
+ public abstract String getDynamicTimerTag();
+
+ /** Returns the windows which are associated with the timer. */
+ public abstract Collection<? extends BoundedWindow> getWindows();
+
+ /** Returns whether the timer is going to be cleared. */
+ public abstract boolean getClearBit();
+
+ /**
+ * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+ * the timer is being cleared.
*
- * <p>The time is relative to the time domain defined in the {@link
+ * <p>The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
* timer.
*/
- public abstract Instant getTimestamp();
+ @Nullable
+ public abstract Instant getFireTimestamp();
+
+ /**
+ * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+ * the timer is being cleared.
+ */
+ @Nullable
+ public abstract Instant getHoldTimestamp();
- /** A user supplied payload. */
+ /**
+ * Returns the {@link PaneInfo} that is related to the timer. This field is nullable only when the
+ * timer is being cleared.
+ */
@Nullable
- public abstract T getPayload();
+ public abstract PaneInfo getPane();
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Timer)) {
+ return false;
+ }
+ Timer<?> that = (Timer<?>) other;
+ return Objects.equals(this.getUserKey(), that.getUserKey())
+ && Objects.equals(this.getDynamicTimerTag(), that.getDynamicTimerTag())
+ && Objects.equals(this.getWindows(), that.getWindows())
+ && (this.getClearBit() == that.getClearBit())
+ && Objects.equals(this.getFireTimestamp(), that.getFireTimestamp())
+ && Objects.equals(this.getHoldTimestamp(), that.getHoldTimestamp())
+ && Objects.equals(this.getPane(), that.getPane());
+ }
+
+ @Override
+ public int hashCode() {
+ // Hash only the millis of the timestamp to be consistent with equals
+ if (getClearBit()) {
+ return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows());
+ }
+ return Objects.hash(
+ getUserKey(),
+ getDynamicTimerTag(),
+ getClearBit(),
+ getFireTimestamp().getMillis(),
+ getHoldTimestamp().getMillis(),
+ getWindows(),
+ getPane());
+ }
/**
* A {@link org.apache.beam.sdk.coders.Coder} for timers.
*
- * <p>This coder is deterministic if the payload coder is deterministic.
+ * <p>This coder is deterministic if both the key coder and window coder are deterministic.
*
- * <p>This coder is inexpensive for size estimation of elements if the payload coder is
- * inexpensive for size estimation.
+ * <p>This coder is inexpensive for size estimation of elements if the key coder and window coder
+ * are inexpensive for size estimation.
*/
public static class Coder<T> extends StructuredCoder<Timer<T>> {
- public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
- return new Coder(payloadCoder);
+ public static <T> Coder<T> of(
+ org.apache.beam.sdk.coders.Coder<T> keyCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+ return new Coder<>(keyCoder, windowCoder);
}
- private final org.apache.beam.sdk.coders.Coder<T> payloadCoder;
-
- private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
- this.payloadCoder = payloadCoder;
+ private final org.apache.beam.sdk.coders.Coder<T> keyCoder;
+ private final org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>>
+ windowsCoder;
+ private final org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder;
+
+ private Coder(
+ org.apache.beam.sdk.coders.Coder<T> keyCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+ this.windowCoder = windowCoder;
+ this.keyCoder = keyCoder;
+ this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder);
}
@Override
public void encode(Timer<T> timer, OutputStream outStream) throws CoderException, IOException {
- InstantCoder.of().encode(timer.getTimestamp(), outStream);
- payloadCoder.encode(timer.getPayload(), outStream);
+ keyCoder.encode(timer.getUserKey(), outStream);
+ StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+ windowsCoder.encode(timer.getWindows(), outStream);
+ BooleanCoder.of().encode(timer.getClearBit(), outStream);
+ if (!timer.getClearBit()) {
+ InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+ InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+ PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+ }
}
@Override
public Timer<T> decode(InputStream inStream) throws CoderException, IOException {
- Instant instant = InstantCoder.of().decode(inStream);
- T value = payloadCoder.decode(inStream);
- return Timer.of(instant, value);
+ T userKey = keyCoder.decode(inStream);
+ String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+ Collection<? extends BoundedWindow> windows = windowsCoder.decode(inStream);
+ boolean clearBit = BooleanCoder.of().decode(inStream);
+ if (clearBit) {
+ return Timer.cleared(userKey, dynamicTimerTag, windows);
+ }
+ Instant fireTimestamp = InstantCoder.of().decode(inStream);
+ Instant holdTimestamp = InstantCoder.of().decode(inStream);
+ PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
+ return Timer.of(userKey, dynamicTimerTag, windows, fireTimestamp, holdTimestamp, pane);
}
@Override
public List<? extends org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
- return Collections.singletonList(payloadCoder);
+ return Collections.singletonList(keyCoder);
}
@Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(this, "Payload coder must be deterministic", payloadCoder);
+ public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() {
+ return Arrays.asList(keyCoder, windowCoder);
}
- @Override
- public boolean consistentWithEquals() {
- return payloadCoder.consistentWithEquals();
+ public org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> getWindowCoder() {
+ return windowCoder;
}
- @Override
- public Object structuralValue(Timer<T> value) {
- return Timer.of(value.getTimestamp(), payloadCoder.structuralValue(value.getPayload()));
+ public org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>> getWindowsCoder() {
+ return windowsCoder;
+ }
+
+ public org.apache.beam.sdk.coders.Coder<T> getValueCoder() {
+ return keyCoder;
}
@Override
- public boolean isRegisterByteSizeObserverCheap(Timer<T> value) {
- return payloadCoder.isRegisterByteSizeObserverCheap(value.getPayload());
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(this, "UserKey coder must be deterministic", keyCoder);
+ verifyDeterministic(this, "Window coder must be deterministic", windowCoder);
}
@Override
public void registerByteSizeObserver(Timer<T> value, ElementByteSizeObserver observer)
throws Exception {
- InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer);
- payloadCoder.registerByteSizeObserver(value.getPayload(), observer);
+ keyCoder.registerByteSizeObserver(value.getUserKey(), observer);
+ StringUtf8Coder.of().registerByteSizeObserver(value.getDynamicTimerTag(), observer);
+ windowsCoder.registerByteSizeObserver(value.getWindows(), observer);
+ BooleanCoder.of().registerByteSizeObserver(value.getClearBit(), observer);
+ if (!value.getClearBit()) {
+ InstantCoder.of().registerByteSizeObserver(value.getFireTimestamp(), observer);
+ InstantCoder.of().registerByteSizeObserver(value.getHoldTimestamp(), observer);
+ PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer);
+ }
}
}
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index b2adc2b..e6b45c4 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -74,7 +74,7 @@ public class CoderTranslationTest {
.add(StringUtf8Coder.of())
.add(IntervalWindowCoder.of())
.add(IterableCoder.of(ByteArrayCoder.of()))
- .add(Timer.Coder.of(ByteArrayCoder.of()))
+ .add(Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE))
.add(LengthPrefixCoder.of(IterableCoder.of(VarLongCoder.of())))
.add(GlobalWindow.Coder.INSTANCE)
.add(
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
index 2ac9db5..fe08582 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
@@ -255,10 +255,34 @@ public class CommonCoderTest {
return ((Number) value).longValue();
} else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
Map<String, Object> kvMap = (Map<String, Object>) value;
- Coder<?> payloadCoder = (Coder) coder.getCoderArguments().get(0);
+ Coder<?> keyCoder = ((Timer.Coder) coder).getValueCoder();
+ Coder<? extends BoundedWindow> windowCoder = ((Timer.Coder) coder).getWindowCoder();
+ List<BoundedWindow> windows = new ArrayList<>();
+ for (Object window : (List<Object>) kvMap.get("windows")) {
+ windows.add(
+ (BoundedWindow) convertValue(window, coderSpec.getComponents().get(1), windowCoder));
+ }
+ if ((boolean) kvMap.get("clearBit")) {
+ return Timer.cleared(
+ convertValue(kvMap.get("userKey"), coderSpec.getComponents().get(0), keyCoder),
+ (String) kvMap.get("dynamicTimerTag"),
+ windows);
+ }
+ Map<String, Object> paneInfoMap = (Map<String, Object>) kvMap.get("pane");
+ PaneInfo paneInfo =
+ PaneInfo.createPane(
+ (boolean) paneInfoMap.get("is_first"),
+ (boolean) paneInfoMap.get("is_last"),
+ PaneInfo.Timing.valueOf((String) paneInfoMap.get("timing")),
+ (int) paneInfoMap.get("index"),
+ (int) paneInfoMap.get("on_time_index"));
return Timer.of(
- new Instant(((Number) kvMap.get("timestamp")).longValue()),
- convertValue(kvMap.get("payload"), coderSpec.getComponents().get(0), payloadCoder));
+ convertValue(kvMap.get("userKey"), coderSpec.getComponents().get(0), keyCoder),
+ (String) kvMap.get("dynamicTimerTag"),
+ windows,
+ new Instant(((Number) kvMap.get("fireTimestamp")).longValue()),
+ new Instant(((Number) kvMap.get("holdTimestamp")).longValue()),
+ paneInfo);
} else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) {
Map<String, Object> kvMap = (Map<String, Object>) value;
Instant end = new Instant(((Number) kvMap.get("end")).longValue());
@@ -434,8 +458,7 @@ public class CommonCoderTest {
assertFalse(expectedValueIterator.hasNext());
} else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
- assertEquals(((Timer) expectedValue).getTimestamp(), ((Timer) actualValue).getTimestamp());
- assertThat(((Timer) expectedValue).getPayload(), equalTo(((Timer) actualValue).getPayload()));
+ assertEquals((Timer) expectedValue, (Timer) actualValue);
} else if (s.equals(getUrn(StandardCoders.Enum.GLOBAL_WINDOW))) {
assertEquals(expectedValue, actualValue);
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 4e634d0..de961b5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.BagState;
@@ -59,6 +58,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -207,8 +207,10 @@ public class ParDoTranslationTest {
CoderTranslation.fromProto(
components.getCodersOrThrow(timerKvCoderComponents.valueCoderId()),
rehydratedComponents);
+ // TODO(BEAM-9562) Plump through actual Timer value coder.
assertEquals(
- org.apache.beam.runners.core.construction.Timer.Coder.of(VoidCoder.of()),
+ org.apache.beam.runners.core.construction.Timer.Coder.of(
+ StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE),
timerValueCoder);
}
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
index ac3bd70..f3d12e8 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
@@ -17,14 +17,16 @@
*/
package org.apache.beam.runners.core.construction;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
+import java.util.Collections;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -33,42 +35,128 @@ import org.junit.runners.JUnit4;
/** Tests for {@link Timer}. */
@RunWith(JUnit4.class)
public class TimerTest {
- private static final Instant INSTANT = Instant.now();
+ private static final Instant FIRE_TIME = new Instant(123L);
+ private static final Instant HOLD_TIME = new Instant(456L);
@Test
- public void testTimer() {
- Timer<Void> timerA = Timer.of(INSTANT);
- assertEquals(INSTANT, timerA.getTimestamp());
- assertNull(timerA.getPayload());
+ public void testClearTimer() {
+ Timer<String> clearedTimer =
+ Timer.cleared("timer", "tag", Collections.singleton(GlobalWindow.INSTANCE));
+ assertTrue(clearedTimer.getClearBit());
+ assertEquals("timer", clearedTimer.getUserKey());
+ assertEquals("tag", clearedTimer.getDynamicTimerTag());
+ assertEquals(Collections.singleton(GlobalWindow.INSTANCE), clearedTimer.getWindows());
+ }
- Timer<String> timerB = Timer.of(INSTANT, "ABC");
- assertEquals(INSTANT, timerB.getTimestamp());
- assertEquals("ABC", timerB.getPayload());
+ @Test
+ public void testTimer() {
+ Timer<String> timer =
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING);
+ assertEquals("key", timer.getUserKey());
+ assertEquals("tag", timer.getDynamicTimerTag());
+ assertEquals(FIRE_TIME, timer.getFireTimestamp());
+ assertEquals(HOLD_TIME, timer.getHoldTimestamp());
+ assertEquals(Collections.singleton(GlobalWindow.INSTANCE), timer.getWindows());
+ assertEquals(PaneInfo.NO_FIRING, timer.getPane());
+ assertFalse(timer.getClearBit());
}
@Test
- public void testTimerCoderWithInconsistentWithEqualsPayloadCoder() throws Exception {
- Coder<Timer<byte[]>> coder = Timer.Coder.of(ByteArrayCoder.of());
+ public void testTimerCoderWithInconsistentWithEqualsComponentCoders() throws Exception {
+ Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
CoderProperties.coderSerializable(coder);
CoderProperties.structuralValueDecodeEncodeEqual(
- coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+ coder,
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING));
+ CoderProperties.structuralValueDecodeEncodeEqual(
+ coder, Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE)));
CoderProperties.structuralValueConsistentWithEquals(
- coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)), Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+ coder,
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING),
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING));
+ CoderProperties.structuralValueConsistentWithEquals(
+ coder,
+ Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE)),
+ Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE)));
}
@Test
- public void testTimerCoderWithConsistentWithEqualsPayloadCoder() throws Exception {
- Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of());
- CoderProperties.coderDecodeEncodeEqual(coder, Timer.of(INSTANT, "ABC"));
+ public void testTimerCoderWithConsistentWithEqualsComponentCoders() throws Exception {
+ Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+ CoderProperties.coderDecodeEncodeEqual(
+ coder,
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING));
+ CoderProperties.coderDecodeEncodeEqual(
+ coder, Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)));
CoderProperties.coderConsistentWithEquals(
- coder, Timer.of(INSTANT, "ABC"), Timer.of(INSTANT, "ABC"));
- CoderProperties.coderDeterministic(coder, Timer.of(INSTANT, "ABC"), Timer.of(INSTANT, "ABC"));
- }
-
- @Test
- public void testTimerCoderWireFormat() throws Exception {
- Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of());
- CoderProperties.coderEncodesBase64(
- coder, Timer.of(new Instant(255L), "ABC"), "gAAAAAAAAP8DQUJD");
+ coder,
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING),
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING));
+ CoderProperties.coderConsistentWithEquals(
+ coder,
+ Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)),
+ Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)));
+ CoderProperties.coderDeterministic(
+ coder,
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING),
+ Timer.of(
+ "key",
+ "tag",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ FIRE_TIME,
+ HOLD_TIME,
+ PaneInfo.NO_FIRING));
+ CoderProperties.coderDeterministic(
+ coder,
+ Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)),
+ Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)));
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 72e5c5d..7e5287a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -685,7 +686,15 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
timerId);
WindowedValue<KV<Object, Timer>> timerValue =
WindowedValue.of(
- KV.of(timerKey, Timer.of(timestamp, new byte[0])),
+ KV.of(
+ timerKey,
+ Timer.of(
+ "",
+ "",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ timestamp,
+ timestamp,
+ PaneInfo.NO_FIRING)),
outputTimestamp,
Collections.singleton(window),
PaneInfo.NO_FIRING);
@@ -749,7 +758,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
TimerInternals.TimerData.of(
timerSpec.inputCollectionId(),
namespace,
- timer.getTimestamp(),
+ timer.getFireTimestamp(),
timerSpec.getTimerSpec().getTimeDomain());
timerRegistration.accept(windowedValue, timerData);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
index 89b2b7d..c8b25ec 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
@@ -120,10 +120,14 @@ public class TimerReceiver {
TimerInternals timerInternals = stepContext.namespacedToUser().timerInternals();
timerInternals.setTimer(
- namespace, timerId, "", timer.getTimestamp(), windowedValue.getTimestamp(), timeDomain);
+ namespace,
+ timerId,
+ "",
+ timer.getFireTimestamp(),
+ windowedValue.getTimestamp(),
+ timeDomain);
timerIdToKey.put(timerId, windowedValue.getValue().getKey());
- timerIdToPayload.put(timerId, timer.getPayload());
}
return true;
}
@@ -143,7 +147,13 @@ public class TimerReceiver {
WindowedValue.of(
KV.of(
timerIdToKey.get(timerData.getTimerId()),
- Timer.of(timerData.getTimestamp(), timerIdToPayload.get(timerData.getTimerId()))),
+ Timer.of(
+ "",
+ "",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ timerData.getOutputTimestamp(),
+ timerData.getOutputTimestamp(),
+ PaneInfo.NO_FIRING)),
timerData.getOutputTimestamp(),
Collections.singleton(window),
PaneInfo.NO_FIRING);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
index 550115d..56a771b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import static org.junit.Assert.assertTrue;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -67,7 +68,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -507,13 +507,17 @@ public class TimerReceiverTest implements Serializable {
}
}
- private KV<String, org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes(
+ private KV<String, org.apache.beam.runners.core.construction.Timer<String>> timerBytes(
String key, long timestampOffset) throws CoderException {
return KV.of(
key,
org.apache.beam.runners.core.construction.Timer.of(
+ "",
+ "",
+ Collections.singleton(GlobalWindow.INSTANCE),
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
- CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED)));
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
+ PaneInfo.NO_FIRING));
}
private static DataflowExecutionContext.DataflowStepContext buildDataflowStepContext() {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
index a277b9e..1a1da66 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
@@ -82,7 +82,7 @@ public class TimerReceiverFactory implements OutputReceiverFactory {
TimeDomain timeDomain = timerSpec.getTimerSpec().getTimeDomain();
String timerId = timerSpec.inputCollectionId();
TimerInternals.TimerData timerData =
- TimerInternals.TimerData.of(timerId, namespace, timer.getTimestamp(), timeDomain);
+ TimerInternals.TimerData.of(timerId, namespace, timer.getFireTimestamp(), timeDomain);
timerDataConsumer.accept(windowedValue, timerData);
}
};
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 56b8179..d67776c 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -142,7 +143,15 @@ public final class PipelineTranslatorUtils {
Instant outputTimestamp = timer.getOutputTimestamp();
WindowedValue<KV<Object, Timer>> timerValue =
WindowedValue.of(
- KV.of(currentTimerKey, Timer.of(timestamp, new byte[0])),
+ KV.of(
+ currentTimerKey,
+ Timer.of(
+ "",
+ "",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ timestamp,
+ timestamp,
+ PaneInfo.NO_FIRING)),
outputTimestamp,
Collections.singleton(window),
PaneInfo.NO_FIRING);
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 365f679..b2a8c1f 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -74,12 +74,10 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSi
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
@@ -105,6 +103,7 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -1251,25 +1250,32 @@ public class RemoteExecutionTest implements Serializable {
return KV.of(key, CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value));
}
- private KV<String, org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes(
+ private KV<String, org.apache.beam.runners.core.construction.Timer<String>> timerBytes(
String key, long timestampOffset) throws CoderException {
return KV.of(
key,
org.apache.beam.runners.core.construction.Timer.of(
+ "",
+ "",
+ Collections.singleton(GlobalWindow.INSTANCE),
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
- CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED)));
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
+ PaneInfo.NO_FIRING));
}
- private Object timerStructuralValue(Object timer) {
+ private Object timerStructuralValue(WindowedValue<?> timer) {
return WindowedValue.FullWindowedValueCoder.of(
KvCoder.of(
StringUtf8Coder.of(),
- org.apache.beam.runners.core.construction.Timer.Coder.of(ByteArrayCoder.of())),
+ org.apache.beam.runners.core.construction.Timer.Coder.of(
+ StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)),
GlobalWindow.Coder.INSTANCE)
- .structuralValue(timer);
+ .structuralValue(
+ (WindowedValue<KV<String, org.apache.beam.runners.core.construction.Timer<String>>>)
+ timer);
}
- private Collection<Object> timerStructuralValues(Collection<?> timers) {
+ private Collection<Object> timerStructuralValues(Collection<WindowedValue<?>> timers) {
return Collections2.transform(timers, this::timerStructuralValue);
}
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 49aa850..8c20237 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -25,6 +25,7 @@ import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.function.BiFunction;
@@ -974,7 +975,15 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
outputTo(
consumers,
WindowedValue.of(
- KV.of(key, Timer.of(scheduledTime)),
+ KV.of(
+ key,
+ Timer.of(
+ "",
+ "",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ scheduledTime,
+ scheduledTime,
+ PaneInfo.NO_FIRING)),
currentOutputTimestamp,
currentElementOrTimer.getWindows(),
currentElementOrTimer.getPane()));
@@ -1362,7 +1371,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
@Override
public Instant fireTimestamp() {
- return currentTimer.getValue().getValue().getTimestamp();
+ return currentTimer.getValue().getValue().getFireTimestamp();
}
@Override
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 7ed8b82..bf0716f 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -33,6 +33,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;
import org.apache.beam.fn.harness.control.BundleSplitListener;
@@ -988,7 +989,15 @@ public class FnApiDoFnRunnerTest implements Serializable {
WindowedValue<KV<T, org.apache.beam.runners.core.construction.Timer>> timerInGlobalWindow(
T value, Instant valueTimestamp, Instant scheduledTimestamp) {
return timestampedValueInGlobalWindow(
- KV.of(value, org.apache.beam.runners.core.construction.Timer.of(scheduledTimestamp)),
+ KV.of(
+ value,
+ org.apache.beam.runners.core.construction.Timer.of(
+ "",
+ "",
+ Collections.singleton(GlobalWindow.INSTANCE),
+ scheduledTimestamp,
+ scheduledTimestamp,
+ PaneInfo.NO_FIRING)),
valueTimestamp);
}
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index a2090a9..8fdc4a4 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -629,22 +629,55 @@ class TimestampCoderImpl(StreamCoderImpl):
class TimerCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
- def __init__(self, payload_coder_impl):
+ def __init__(self, key_coder_impl, window_coder_impl):
self._timestamp_coder_impl = TimestampCoderImpl()
- self._payload_coder_impl = payload_coder_impl
+ self._boolean_coder_impl = BooleanCoderImpl()
+ self._pane_info_coder_impl = PaneInfoCoderImpl()
+ self._key_coder_impl = key_coder_impl
+ self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+ from apache_beam.coders.coders import StrUtf8Coder
+ self._tag_coder_impl = StrUtf8Coder().get_impl()
def encode_to_stream(self, value, out, nested):
# type: (dict, create_OutputStream, bool) -> None
- self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
- self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+ self._key_coder_impl.encode_to_stream(value.user_key, out, True)
+ self._tag_coder_impl.encode_to_stream(value.dynamic_timer_tag, out, True)
+ self._windows_coder_impl.encode_to_stream(value.windows, out, True)
+ self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True)
+ if not value.clear_bit:
+ self._timestamp_coder_impl.encode_to_stream(
+ value.fire_timestamp, out, True)
+ self._timestamp_coder_impl.encode_to_stream(
+ value.hold_timestamp, out, True)
+ self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True)
def decode_from_stream(self, in_stream, nested):
# type: (create_InputStream, bool) -> dict
- # TODO(robertwb): Consider using a concrete class rather than a dict here.
- return dict(
- timestamp=self._timestamp_coder_impl.decode_from_stream(
+ from apache_beam.transforms import userstate
+ user_key = self._key_coder_impl.decode_from_stream(in_stream, True)
+ dynamic_timer_tag = self._tag_coder_impl.decode_from_stream(in_stream, True)
+ windows = self._windows_coder_impl.decode_from_stream(in_stream, True)
+ clear_bit = self._boolean_coder_impl.decode_from_stream(in_stream, True)
+ if clear_bit:
+ return userstate.Timer(
+ user_key=user_key,
+ dynamic_timer_tag=dynamic_timer_tag,
+ windows=windows,
+ clear_bit=clear_bit,
+ fire_timestamp=None,
+ hold_timestamp=None,
+ paneinfo=None)
+
+ return userstate.Timer(
+ user_key=user_key,
+ dynamic_timer_tag=dynamic_timer_tag,
+ windows=windows,
+ clear_bit=clear_bit,
+ fire_timestamp=self._timestamp_coder_impl.decode_from_stream(
in_stream, True),
- payload=self._payload_coder_impl.decode_from_stream(in_stream, True))
+ hold_timestamp=self._timestamp_coder_impl.decode_from_stream(
+ in_stream, True),
+ paneinfo=self._pane_info_coder_impl.decode_from_stream(in_stream, True))
small_ints = [chr(_).encode('latin-1') for _ in range(128)]
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index b14ccf0..e7ce77b 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -588,28 +588,32 @@ class _TimerCoder(FastCoder):
"""A coder used for timer values.
For internal use."""
- def __init__(self, payload_coder):
+ def __init__(self, key_coder, window_coder):
# type: (Coder) -> None
- self._payload_coder = payload_coder
+ self._key_coder = key_coder
+ self._window_coder = window_coder
def _get_component_coders(self):
# type: () -> List[Coder]
- return [self._payload_coder]
+ return [self._key_coder, self._window_coder]
def _create_impl(self):
- return coder_impl.TimerCoderImpl(self._payload_coder.get_impl())
+ return coder_impl.TimerCoderImpl(
+ self._key_coder.get_impl(), self._window_coder.get_impl())
def is_deterministic(self):
# () -> bool
- return self._payload_coder.is_deterministic()
+ return (
+ self._key_coder.is_deterministic() and
+ self._window_coder.is_deterministic())
def __eq__(self, other):
return (
- type(self) == type(other) and
- self._payload_coder == other._payload_coder)
+ type(self) == type(other) and self._key_coder == other._key_coder and
+ self._window_coder == other._window_coder)
def __hash__(self):
- return hash(type(self)) + hash(self._payload_coder)
+ return hash(type(self)) + hash(self._key_coder) + hash(self._window_coder)
Coder.register_structured_urn(common_urns.coders.TIMER.urn, _TimerCoder)
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index fd21261..63dde4e 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -32,6 +32,7 @@ from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
from apache_beam.internal import pickler
from apache_beam.runners import pipeline_context
+from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.transforms.window import GlobalWindow
from apache_beam.utils import timestamp
@@ -80,8 +81,9 @@ class CodersTest(unittest.TestCase):
coders.RunnerAPICoderHolder,
coders.ToBytesCoder
])
- assert not standard - cls.seen, standard - cls.seen
- assert not standard - cls.seen_nested, standard - cls.seen_nested
+ cls.seen_nested -= set([coders.ProtoCoder, CustomCoder])
+ assert not standard - cls.seen
+ assert not cls.seen_nested - standard
@classmethod
def _observe(cls, coder):
@@ -229,14 +231,25 @@ class CodersTest(unittest.TestCase):
def test_timer_coder(self):
self.check_coder(
- coders._TimerCoder(coders.BytesCoder()),
- *[{
- 'timestamp': timestamp.Timestamp(micros=x), 'payload': b'xyz'
- } for x in (-3000, 0, 3000)])
- self.check_coder(
- coders.TupleCoder((coders._TimerCoder(coders.VarIntCoder()), )), ({
- 'timestamp': timestamp.Timestamp.of(37000), 'payload': 389
- }, ))
+ coders._TimerCoder(coders.StrUtf8Coder(), coders.GlobalWindowCoder()),
+ *[
+ userstate.Timer(
+ user_key="key",
+ dynamic_timer_tag="tag",
+ windows=(GlobalWindow(), ),
+ clear_bit=True,
+ fire_timestamp=None,
+ hold_timestamp=None,
+ paneinfo=None),
+ userstate.Timer(
+ user_key="key",
+ dynamic_timer_tag="tag",
+ windows=(GlobalWindow(), ),
+ clear_bit=False,
+ fire_timestamp=timestamp.Timestamp.of(123),
+ hold_timestamp=timestamp.Timestamp.of(456),
+ paneinfo=windowed_value.PANE_INFO_UNKNOWN)
+ ])
def test_tuple_coder(self):
kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 9e118a0..0cc5271 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -38,6 +38,7 @@ from apache_beam.coders import coder_impl
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import schema_pb2
from apache_beam.runners import pipeline_context
+from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.transforms.window import IntervalWindow
from apache_beam.typehints import schemas
@@ -145,9 +146,27 @@ class StandardCodersTest(unittest.TestCase):
x['pane']['index'],
x['pane']['on_time_index'])),
'beam:coder:timer:v1': lambda x,
- payload_parser: dict(
- payload=payload_parser(x['payload']),
- timestamp=Timestamp(micros=x['timestamp'] * 1000)),
+ value_parser,
+ window_parser: userstate.Timer(
+ user_key=value_parser(x['userKey']),
+ dynamic_timer_tag=x['dynamicTimerTag'],
+ clear_bit=x['clearBit'],
+ windows=tuple([window_parser(w) for w in x['windows']]),
+ fire_timestamp=None,
+ hold_timestamp=None,
+ paneinfo=None) if x['clearBit'] else userstate.Timer(
+ user_key=value_parser(x['userKey']),
+ dynamic_timer_tag=x['dynamicTimerTag'],
+ clear_bit=x['clearBit'],
+ fire_timestamp=Timestamp(micros=x['fireTimestamp'] * 1000),
+ hold_timestamp=Timestamp(micros=x['holdTimestamp'] * 1000),
+ windows=tuple([window_parser(w) for w in x['windows']]),
+ paneinfo=PaneInfo(
+ x['pane']['is_first'],
+ x['pane']['is_last'],
+ PaneInfoTiming.from_string(x['pane']['timing']),
+ x['pane']['index'],
+ x['pane']['on_time_index'])),
'beam:coder:double:v1': parse_float,
}
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 9180695..b0f7024 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -580,16 +580,36 @@ class OutputTimer(object):
def set(self, ts):
ts = timestamp.Timestamp.of(ts)
+ # TODO(BEAM-9562): Plumb through actual timer fields.
self._receiver.receive(
- windowed_value.WindowedValue((self._key, dict(timestamp=ts)),
+ windowed_value.WindowedValue((
+ self._key,
+ userstate.Timer(
+ user_key='',
+ dynamic_timer_tag='',
+ windows=(self._window, ),
+ clear_bit=False,
+ fire_timestamp=ts,
+ hold_timestamp=ts,
+ paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
ts, (self._window, )))
def clear(self):
# type: () -> None
dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
+ # TODO(BEAM-9562): Plumb through actual paneinfo.
self._receiver.receive(
- windowed_value.WindowedValue((self._key, dict(timestamp=clear_ts)),
+ windowed_value.WindowedValue((
+ self._key,
+ userstate.Timer(
+ user_key='',
+ dynamic_timer_tag='',
+ windows=(self._window, ),
+ clear_bit=False,
+ fire_timestamp=timestamp.Timestamp.of(clear_ts),
+ hold_timestamp=timestamp.Timestamp.of(0),
+ paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
0, (self._window, )))
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 6d71dc5..6250f38 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -713,7 +713,7 @@ class DoOperation(Operation):
key, timer_data = windowed_timer.value
timer_spec = self.timer_specs[tag]
self.dofn_runner.process_user_timer(
- timer_spec, key, windowed_timer.windows[0], timer_data['timestamp'])
+ timer_spec, key, windowed_timer.windows[0], timer_data.fire_timestamp)
def finish(self):
# type: () -> None
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index f3ffa88..e91374f 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -26,6 +26,7 @@ from __future__ import absolute_import
import types
from builtins import object
+from collections import namedtuple
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
@@ -129,6 +130,18 @@ class CombiningValueStateSpec(StateSpec):
accumulator_coder_id=context.coders.get_id(self.coder)))
+Timer = namedtuple(
+ 'Timer',
+ 'user_key '
+ 'dynamic_timer_tag '
+ 'windows '
+ 'clear_bit '
+ 'fire_timestamp '
+ 'hold_timestamp '
+ 'paneinfo')
+
+
+# TODO(BEAM-9562): Plumb through actual key_coder and window_coder.
class TimerSpec(object):
"""Specification for a user stateful DoFn timer."""
prefix = "ts-"
@@ -148,7 +161,8 @@ class TimerSpec(object):
return beam_runner_api_pb2.TimerFamilySpec(
time_domain=TimeDomain.to_runner_api(self.time_domain),
timer_family_coder_id=context.coders.get_id(
- coders._TimerCoder(coders.SingletonCoder(None))))
+ coders._TimerCoder(
+ coders.StrUtf8Coder(), coders.GlobalWindowCoder())))
# TODO(BEAM-9602): Provide support for dynamic timer.