You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:50:22 UTC
[48/51] [abbrv] incubator-beam git commit: Use informative Instant
formatter in WatermarkHold
Use informative Instant formatter in WatermarkHold
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa4958a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa4958a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa4958a6
Branch: refs/heads/python-sdk
Commit: fa4958a6140eb00ceee08b2468f7d88f17538794
Parents: 280a6a8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Dec 19 20:40:47 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 13:45:37 2016 -0800
----------------------------------------------------------------------
.../apache/beam/runners/core/WatermarkHold.java | 4 +++-
.../sdk/transforms/windowing/BoundedWindow.java | 19 +++++++++++++++++++
2 files changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 7f1afcc..5e5f44d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -207,7 +207,9 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
checkState(!shifted.isBefore(timestamp),
"OutputTimeFn moved element from %s to earlier time %s for window %s",
- timestamp, shifted, window);
+ BoundedWindow.formatTimestamp(timestamp),
+ BoundedWindow.formatTimestamp(shifted),
+ window);
checkState(timestamp.isAfter(window.maxTimestamp())
|| !shifted.isAfter(window.maxTimestamp()),
"OutputTimeFn moved element from %s to %s which is beyond end of "
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index 6da2495..74223b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -34,11 +34,30 @@ import org.joda.time.Instant;
public abstract class BoundedWindow {
// The min and max timestamps that won't overflow when they are converted to
// usec.
+
+ /**
+ * The minimum value for any Beam timestamp. Often referred to as "-infinity".
+ *
+ * <p>This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their
+ * microseconds-since-epoch can be safely represented with a {@code long}.
+ */
public static final Instant TIMESTAMP_MIN_VALUE =
new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+
+ /**
+ * The maximum value for any Beam timestamp. Often referred to as "+infinity".
+ *
+ * <p>This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their
+ * microseconds-since-epoch can be safely represented with a {@code long}.
+ */
public static final Instant TIMESTAMP_MAX_VALUE =
new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+ /**
+ * Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating
+ * whether the timestamp is the end of the global window or one of the distinguished values {@link
+ * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}.
+ */
public static String formatTimestamp(Instant timestamp) {
if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";