You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/11/20 00:28:06 UTC
[beam] branch master updated: Fix Windmill timer tag output
timestamp decoding
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c4ac644 Fix Windmill timer tag output timestamp decoding
new 92ea33e Merge pull request #13373 from nehsyc/fix_timestamp_decoding
c4ac644 is described below
commit c4ac644ccd121a5868383aa18397069d8ea1174e
Author: sychen <sy...@google.com>
AuthorDate: Tue Nov 17 22:24:26 2020 -0800
Fix Windmill timer tag output timestamp decoding
---
.../runners/dataflow/worker/WindmillTimerInternals.java | 13 +++++++------
.../runners/dataflow/worker/WindmillTimerInternalsTest.java | 4 +++-
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index d56e9fe..a102e17 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -329,12 +329,10 @@ class WindmillTimerInternals implements TimerInternals {
? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
: "";
- // Parse the output timestamp.
+ // Parse the output timestamp. Not using '+' as a terminator because the output timestamp is the
+ // last segment in the tag and the timestamp encoding itself may contain '+'.
int outputTimestampStart = timerFamilyEnd + 1;
- int outputTimestampEnd = outputTimestampStart;
- while (outputTimestampEnd < tag.size() && tag.byteAt(outputTimestampEnd) != '+') {
- outputTimestampEnd++;
- }
+ int outputTimestampEnd = tag.size();
// For backwards compatibility, handle the case were the output timestamp isn't present.
Instant outputTimestamp = timestamp;
@@ -386,7 +384,10 @@ class WindmillTimerInternals implements TimerInternals {
.append(timerData.getTimerFamilyId())
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
- // Only encode the extra 9 bytes if the output timestamp is different than the timestamp;
+ // Only encode the extra 9 bytes if the output timestamp is different than the timestamp.
+ // NOTE: If we are going to add more information after the output timestamp in the tag, we
+ // should avoid using '+' as a separator since the timestamp may contain '+' in the
+ // encoding.
if (!timerData.getOutputTimestamp().equals(timerData.getTimestamp())) {
out.write('+');
VarInt.encode(timerData.getOutputTimestamp().getMillis(), out);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index 3482462..c9fe16c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -60,7 +60,9 @@ public class WindmillTimerInternalsTest {
BoundedWindow.TIMESTAMP_MAX_VALUE,
GlobalWindow.INSTANCE.maxTimestamp(),
new Instant(0),
- new Instant(127));
+ new Instant(127),
+ // The encoding of Instant(716000) ends with '+'.
+ new Instant(716001));
private static final List<String> TEST_STATE_FAMILIES = ImmutableList.of("", "F24");