You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/11/20 00:28:06 UTC

[beam] branch master updated: Fix Windmill timer tag output timestamp decoding

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c4ac644  Fix Windmill timer tag output timestamp decoding
     new 92ea33e  Merge pull request #13373 from nehsyc/fix_timestamp_decoding
c4ac644 is described below

commit c4ac644ccd121a5868383aa18397069d8ea1174e
Author: sychen <sy...@google.com>
AuthorDate: Tue Nov 17 22:24:26 2020 -0800

    Fix Windmill timer tag output timestamp decoding
---
 .../runners/dataflow/worker/WindmillTimerInternals.java     | 13 +++++++------
 .../runners/dataflow/worker/WindmillTimerInternalsTest.java |  4 +++-
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index d56e9fe..a102e17 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -329,12 +329,10 @@ class WindmillTimerInternals implements TimerInternals {
             ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
             : "";
 
-    // Parse the output timestamp.
+    // Parse the output timestamp. Not using '+' as a terminator because the output timestamp is the
+    // last segment in the tag and the timestamp encoding itself may contain '+'.
     int outputTimestampStart = timerFamilyEnd + 1;
-    int outputTimestampEnd = outputTimestampStart;
-    while (outputTimestampEnd < tag.size() && tag.byteAt(outputTimestampEnd) != '+') {
-      outputTimestampEnd++;
-    }
+    int outputTimestampEnd = tag.size();
 
     // For backwards compatibility, handle the case were the output timestamp isn't present.
     Instant outputTimestamp = timestamp;
@@ -386,7 +384,10 @@ class WindmillTimerInternals implements TimerInternals {
                 .append(timerData.getTimerFamilyId())
                 .toString();
         out.write(tagString.getBytes(StandardCharsets.UTF_8));
-        // Only encode the extra 9 bytes if the output timestamp is different than the timestamp;
+        // Only encode the extra 9 bytes if the output timestamp is different than the timestamp.
+        // NOTE: If we are going to add more information after the output timestamp in the tag, we
+        // should avoid using '+' as a separator since the timestamp may contain '+' in the
+        // encoding.
         if (!timerData.getOutputTimestamp().equals(timerData.getTimestamp())) {
           out.write('+');
           VarInt.encode(timerData.getOutputTimestamp().getMillis(), out);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index 3482462..c9fe16c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -60,7 +60,9 @@ public class WindmillTimerInternalsTest {
           BoundedWindow.TIMESTAMP_MAX_VALUE,
           GlobalWindow.INSTANCE.maxTimestamp(),
           new Instant(0),
-          new Instant(127));
+          new Instant(127),
+          // The encoding of Instant(716000) ends with '+'.
+          new Instant(716001));
 
   private static final List<String> TEST_STATE_FAMILIES = ImmutableList.of("", "F24");