You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:50:22 UTC

[48/51] [abbrv] incubator-beam git commit: Use informative Instant formatter in WatermarkHold

Use informative Instant formatter in WatermarkHold


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa4958a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa4958a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa4958a6

Branch: refs/heads/python-sdk
Commit: fa4958a6140eb00ceee08b2468f7d88f17538794
Parents: 280a6a8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Dec 19 20:40:47 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 13:45:37 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/WatermarkHold.java  |  4 +++-
 .../sdk/transforms/windowing/BoundedWindow.java  | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 7f1afcc..5e5f44d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -207,7 +207,9 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
     Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
     checkState(!shifted.isBefore(timestamp),
         "OutputTimeFn moved element from %s to earlier time %s for window %s",
-        timestamp, shifted, window);
+        BoundedWindow.formatTimestamp(timestamp),
+        BoundedWindow.formatTimestamp(shifted),
+        window);
     checkState(timestamp.isAfter(window.maxTimestamp())
             || !shifted.isAfter(window.maxTimestamp()),
         "OutputTimeFn moved element from %s to %s which is beyond end of "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index 6da2495..74223b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -34,11 +34,30 @@ import org.joda.time.Instant;
 public abstract class BoundedWindow {
   // The min and max timestamps that won't overflow when they are converted to
   // usec.
+
+  /**
+   * The minimum value for any Beam timestamp. Often referred to as "-infinity".
+   *
+   * <p>This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MIN_VALUE =
       new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+
+  /**
+   * The maximum value for any Beam timestamp. Often referred to as "+infinity".
+   *
+   * <p>This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MAX_VALUE =
       new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
+  /**
+   * Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating
+   * whether the timestamp is the end of the global window or one of the distinguished values {@link
+   * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}.
+   */
   public static String formatTimestamp(Instant timestamp) {
     if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
       return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";