You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/06/15 04:32:19 UTC
[beam] branch master updated: Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ba01293ba31 Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.
ba01293ba31 is described below
commit ba01293ba31fdac351f93521443dd91eef56a2ff
Author: Reuven Lax <re...@google.com>
AuthorDate: Tue Jun 14 21:32:12 2022 -0700
Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.
---
.../dataflow/worker/WindmillTimerInternals.java | 55 ++++++++++++----------
.../worker/StreamingDataflowWorkerTest.java | 1 +
.../worker/windmill/src/main/proto/windmill.proto | 2 +
3 files changed, 34 insertions(+), 24 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index 658e74ee6c1..c1f89c679d5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -210,22 +210,31 @@ class WindmillTimerInternals implements TimerInternals {
// Setting the timer. If it is a user timer, set a hold.
// Only set a hold if it's needed and if the hold is before the end of the global window.
- if (needsWatermarkHold(timerData)
- && timerData
- .getOutputTimestamp()
- .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
- // Setting a timer, clear any prior hold and set to the new value
- outputBuilder
- .addWatermarkHoldsBuilder()
- .setTag(timerHoldTag(prefix, timerData))
- .setStateFamily(stateFamily)
- .setReset(true)
- .addTimestamps(
- WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+ if (needsWatermarkHold(timerData)) {
+ if (timerData
+ .getOutputTimestamp()
+ .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
+ // Setting a timer, clear any prior hold and set to the new value
+ outputBuilder
+ .addWatermarkHoldsBuilder()
+ .setTag(timerHoldTag(prefix, timerData))
+ .setStateFamily(stateFamily)
+ .setReset(true)
+ .addTimestamps(
+ WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+ } else {
+ // Clear the hold in case a previous iteration of this timer set one.
+ outputBuilder
+ .addWatermarkHoldsBuilder()
+ .setTag(timerHoldTag(prefix, timerData))
+ .setStateFamily(stateFamily)
+ .setReset(true);
+ }
}
} else {
// Deleting a timer. If it is a user timer, clear the hold
timer.clearTimestamp();
+ timer.clearMetadataTimestamp();
// Clear the hold even if it's the end of the global window in order to maintain update
// compatibility.
if (needsWatermarkHold(timerData)) {
@@ -276,6 +285,9 @@ class WindmillTimerInternals implements TimerInternals {
builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
+ // Store the output timestamp in the metadata timestamp.
+ builder.setMetadataTimestamp(
+ WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
return builder;
}
@@ -344,8 +356,10 @@ class WindmillTimerInternals implements TimerInternals {
? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
: "";
- // Parse the output timestamp. Not using '+' as a terminator because the output timestamp is the
- // last segment in the tag and the timestamp encoding itself may contain '+'.
+ // For backwards compatibility, parse the output timestamp from the tag. Not using '+' as a
+ // terminator because the
+ // output timestamp is the last segment in the tag and the timestamp encoding itself may contain
+ // '+'.
int outputTimestampStart = timerFamilyEnd + 1;
int outputTimestampEnd = tag.size();
@@ -360,6 +374,8 @@ class WindmillTimerInternals implements TimerInternals {
} catch (IOException e) {
throw new RuntimeException(e);
}
+ } else if (timer.hasMetadataTimestamp()) {
+ outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
}
StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);
@@ -373,8 +389,7 @@ class WindmillTimerInternals implements TimerInternals {
}
private static boolean useNewTimerTagEncoding(TimerData timerData) {
- return !timerData.getTimerFamilyId().isEmpty()
- || !timerData.getOutputTimestamp().equals(timerData.getTimestamp());
+ return !timerData.getTimerFamilyId().isEmpty();
}
/**
@@ -399,14 +414,6 @@ class WindmillTimerInternals implements TimerInternals {
.append(timerData.getTimerFamilyId())
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
- // Only encode the extra 9 bytes if the output timestamp is different than the timestamp.
- // NOTE: If we are going to add more information after the output timestamp in the tag, we
- // should avoid using '+' as a separator since the timestamp may contain '+' in the
- // encoding.
- if (!timerData.getOutputTimestamp().equals(timerData.getTimestamp())) {
- out.write('+');
- VarInt.encode(timerData.getOutputTimestamp().getMillis(), out);
- }
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index c383e5d16e5..540b15061af 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -1427,6 +1427,7 @@ public class StreamingDataflowWorkerTest {
.setStateFamily("MergeWindows");
if (!delete) {
builder.setTimestamp(timestampMillis * 1000);
+ builder.setMetadataTimestamp(timestampMillis * 1000);
}
return builder.build();
}
diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index fdb37ba0697..c43a1f006df 100644
--- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -42,6 +42,8 @@ message Timer {
}
optional Type type = 3 [default = WATERMARK];
optional string state_family = 4;
+ optional int64 metadata_timestamp = 5 [default = 0x7fffffffffffffff];
+ optional bytes metadata_payload = 6;
}
message InputMessageBundle {