You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/06/15 04:32:19 UTC

[beam] branch master updated: Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ba01293ba31 Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.
ba01293ba31 is described below

commit ba01293ba31fdac351f93521443dd91eef56a2ff
Author: Reuven Lax <re...@google.com>
AuthorDate: Tue Jun 14 21:32:12 2022 -0700

    Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.
---
 .../dataflow/worker/WindmillTimerInternals.java    | 55 ++++++++++++----------
 .../worker/StreamingDataflowWorkerTest.java        |  1 +
 .../worker/windmill/src/main/proto/windmill.proto  |  2 +
 3 files changed, 34 insertions(+), 24 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index 658e74ee6c1..c1f89c679d5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -210,22 +210,31 @@ class WindmillTimerInternals implements TimerInternals {
         // Setting the timer. If it is a user timer, set a hold.
 
         // Only set a hold if it's needed and if the hold is before the end of the global window.
-        if (needsWatermarkHold(timerData)
-            && timerData
-                .getOutputTimestamp()
-                .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
-          // Setting a timer, clear any prior hold and set to the new value
-          outputBuilder
-              .addWatermarkHoldsBuilder()
-              .setTag(timerHoldTag(prefix, timerData))
-              .setStateFamily(stateFamily)
-              .setReset(true)
-              .addTimestamps(
-                  WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+        if (needsWatermarkHold(timerData)) {
+          if (timerData
+              .getOutputTimestamp()
+              .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
+            // Setting a timer, clear any prior hold and set to the new value
+            outputBuilder
+                .addWatermarkHoldsBuilder()
+                .setTag(timerHoldTag(prefix, timerData))
+                .setStateFamily(stateFamily)
+                .setReset(true)
+                .addTimestamps(
+                    WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+          } else {
+            // Clear the hold in case a previous iteration of this timer set one.
+            outputBuilder
+                .addWatermarkHoldsBuilder()
+                .setTag(timerHoldTag(prefix, timerData))
+                .setStateFamily(stateFamily)
+                .setReset(true);
+          }
         }
       } else {
         // Deleting a timer. If it is a user timer, clear the hold
         timer.clearTimestamp();
+        timer.clearMetadataTimestamp();
         // Clear the hold even if it's the end of the global window in order to maintain update
         // compatibility.
         if (needsWatermarkHold(timerData)) {
@@ -276,6 +285,9 @@ class WindmillTimerInternals implements TimerInternals {
 
     builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
 
+    // Store the output timestamp in the metadata timestamp.
+    builder.setMetadataTimestamp(
+        WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
     return builder;
   }
 
@@ -344,8 +356,10 @@ class WindmillTimerInternals implements TimerInternals {
             ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
             : "";
 
-    // Parse the output timestamp. Not using '+' as a terminator because the output timestamp is the
-    // last segment in the tag and the timestamp encoding itself may contain '+'.
+    // For backwards compatibility, parse the output timestamp from the tag. Not using '+' as a
+    // terminator because the
+    // output timestamp is the last segment in the tag and the timestamp encoding itself may contain
+    // '+'.
     int outputTimestampStart = timerFamilyEnd + 1;
     int outputTimestampEnd = tag.size();
 
@@ -360,6 +374,8 @@ class WindmillTimerInternals implements TimerInternals {
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
+    } else if (timer.hasMetadataTimestamp()) {
+      outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
     }
 
     StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);
@@ -373,8 +389,7 @@ class WindmillTimerInternals implements TimerInternals {
   }
 
   private static boolean useNewTimerTagEncoding(TimerData timerData) {
-    return !timerData.getTimerFamilyId().isEmpty()
-        || !timerData.getOutputTimestamp().equals(timerData.getTimestamp());
+    return !timerData.getTimerFamilyId().isEmpty();
   }
 
   /**
@@ -399,14 +414,6 @@ class WindmillTimerInternals implements TimerInternals {
                 .append(timerData.getTimerFamilyId())
                 .toString();
         out.write(tagString.getBytes(StandardCharsets.UTF_8));
-        // Only encode the extra 9 bytes if the output timestamp is different than the timestamp.
-        // NOTE: If we are going to add more information after the output timestamp in the tag, we
-        // should avoid using '+' as a separator since the timestamp may contain '+' in the
-        // encoding.
-        if (!timerData.getOutputTimestamp().equals(timerData.getTimestamp())) {
-          out.write('+');
-          VarInt.encode(timerData.getOutputTimestamp().getMillis(), out);
-        }
       } else {
         // Timers without timerFamily would have timerFamily would be an empty string
         tagString =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index c383e5d16e5..540b15061af 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -1427,6 +1427,7 @@ public class StreamingDataflowWorkerTest {
             .setStateFamily("MergeWindows");
     if (!delete) {
       builder.setTimestamp(timestampMillis * 1000);
+      builder.setMetadataTimestamp(timestampMillis * 1000);
     }
     return builder.build();
   }
diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index fdb37ba0697..c43a1f006df 100644
--- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -42,6 +42,8 @@ message Timer {
   }
   optional Type type = 3 [default = WATERMARK];
   optional string state_family = 4;
+  optional int64 metadata_timestamp = 5 [default = 0x7fffffffffffffff];
+  optional bytes metadata_payload = 6;
 }
 
 message InputMessageBundle {