You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/11 22:12:16 UTC

[1/2] beam git commit: Use a consistent calculation for GC Time

Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 e8e8a6e87 -> 74bcc0237


Use a consistent calculation for GC Time

Truncate all garbage collection timestamps to be at the end of the
global window at the latest.

Add a reshuffle test, which was failing when late data arrived.

Update ReifyTimestamps to permit infinite skew. Elements that have
timestamps extracted from them may be late, but that is not the concern
of ReifyTimestamps.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/608629ad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/608629ad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/608629ad

Branch: refs/heads/release-2.0.0
Commit: 608629adf215348c802512c5ee22403055f8765a
Parents: e8e8a6e
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 11 09:26:30 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 11 15:12:06 2017 -0700

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        |  2 +-
 .../apache/beam/runners/core/LateDataUtils.java | 33 ++++++-
 .../beam/runners/core/ReduceFnRunner.java       | 43 +++-------
 .../beam/runners/core/SimpleDoFnRunner.java     |  2 +-
 .../beam/runners/core/StatefulDoFnRunner.java   |  6 +-
 .../apache/beam/runners/core/WatermarkHold.java |  9 +-
 .../beam/runners/core/LateDataUtilsTest.java    | 90 ++++++++++++++++++++
 .../beam/sdk/transforms/ReifyTimestamps.java    |  6 ++
 .../apache/beam/sdk/transforms/Reshuffle.java   | 29 ++++---
 .../sdk/transforms/ReifyTimestampsTest.java     | 36 ++++++++
 .../beam/sdk/transforms/ReshuffleTest.java      | 27 ++++++
 11 files changed, 234 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 66385c1..570f524 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -159,7 +159,7 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
     /** Is {@code window} expired w.r.t. the garbage collection watermark? */
     private boolean canDropDueToExpiredWindow(BoundedWindow window) {
       Instant inputWM = timerInternals.currentInputWatermarkTime();
-      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+      return LateDataUtils.garbageCollectionTime(window, windowingStrategy).isBefore(inputWM);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index 732e60c..8a2b7c6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -24,15 +24,46 @@ import com.google.common.collect.Iterables;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
-
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 /**
  * Utils to handle late data.
  */
 public class LateDataUtils {
+  private LateDataUtils() {}
+
+  /**
+   * Return when {@code window} should be garbage collected. If the window's expiration time is on
+   * or after the end of the global window, it will be truncated to the end of the global window.
+   */
+  public static Instant garbageCollectionTime(
+      BoundedWindow window, WindowingStrategy windowingStrategy) {
+    return garbageCollectionTime(window, windowingStrategy.getAllowedLateness());
+  }
+
+  /**
+   * Return when {@code window} should be garbage collected. If the window's expiration time is on
+   * or after the end of the global window, it will be truncated to the end of the global window.
+   */
+  public static Instant garbageCollectionTime(BoundedWindow window, Duration allowedLateness) {
+
+    // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
+    // global window, then we truncate it. The conditional is phrased like it is because the
+    // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+    if (GlobalWindow.INSTANCE
+        .maxTimestamp()
+        .minus(allowedLateness)
+        .isBefore(window.maxTimestamp())) {
+      return GlobalWindow.INSTANCE.maxTimestamp();
+    } else {
+      return window.maxTimestamp().plus(allowedLateness);
+    }
+  }
 
   /**
    * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late input elements.

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index d2ed835..62d519f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -663,7 +662,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       W window = directContext.window();
       this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
           && timer.getTimestamp().equals(window.maxTimestamp());
-      Instant cleanupTime = garbageCollectionTime(window);
+      Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
       this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
     }
 
@@ -769,9 +768,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
           // cleanup event and handled by the above).
           // Note we must do this even if the trigger is finished so that we are sure to cleanup
           // any final trigger finished bits.
-          checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+          checkState(
+              windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
               "Unexpected zero getAllowedLateness");
-          Instant cleanupTime = garbageCollectionTime(directContext.window());
+          Instant cleanupTime =
+              LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
           WindowTracing.debug(
               "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
                   + "inputWatermark:{}; outputWatermark:{}",
@@ -957,6 +958,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     // Extract the window hold, and as a side effect clear it.
     final WatermarkHold.OldAndNewHolds pair =
         watermarkHold.extractAndRelease(renamedContext, isFinished).read();
+    // TODO: This isn't accurate if the elements are late. See BEAM-2262
     final Instant outputTimestamp = pair.oldHold;
     @Nullable Instant newHold = pair.newHold;
 
@@ -972,11 +974,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       if (newHold.isAfter(directContext.window().maxTimestamp())) {
         // The hold must be for garbage collection, which can't have happened yet.
         checkState(
-          newHold.isEqual(garbageCollectionTime(directContext.window())),
-          "new hold %s should be at garbage collection for window %s plus %s",
-          newHold,
-          directContext.window(),
-          windowingStrategy.getAllowedLateness());
+            newHold.isEqual(
+                LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy)),
+            "new hold %s should be at garbage collection for window %s plus %s",
+            newHold,
+            directContext.window(),
+            windowingStrategy.getAllowedLateness());
       } else {
         // The hold must be for the end-of-window, which can't have happened yet.
         checkState(
@@ -1042,7 +1045,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     String which;
     Instant timer;
     if (endOfWindow.isBefore(inputWM)) {
-      timer = garbageCollectionTime(directContext.window());
+      timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
       which = "garbage collection";
     } else {
       timer = endOfWindow;
@@ -1072,28 +1075,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         timerInternals.currentOutputWatermarkTime());
     Instant eow = directContext.window().maxTimestamp();
     directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
-    Instant gc = garbageCollectionTime(directContext.window());
+    Instant gc = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
     if (gc.isAfter(eow)) {
       directContext.timers().deleteTimer(gc, TimeDomain.EVENT_TIME);
     }
   }
 
-  /**
-   * Return when {@code window} should be garbage collected. If the window's expiration time is on
-   * or after the end of the global window, it will be truncated to the end of the global window.
-   */
-  private Instant garbageCollectionTime(W window) {
-
-    // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
-    // global window, then we truncate it. The conditional is phrased like it is because the
-    // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
-    if (GlobalWindow.INSTANCE
-        .maxTimestamp()
-        .minus(windowingStrategy.getAllowedLateness())
-        .isBefore(window.maxTimestamp())) {
-      return GlobalWindow.INSTANCE.maxTimestamp();
-    } else {
-      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 85423c0..7ca305e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -949,7 +949,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
      */
     private Instant minTargetAndGcTime(Instant target) {
       if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+        Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness);
         if (target.isAfter(windowExpiry)) {
           return windowExpiry;
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 28a9dee..c68a943 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -104,7 +104,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
   }
 
   private boolean isLate(BoundedWindow window) {
-    Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+    Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
     Instant inputWM = cleanupTimer.currentInputWatermarkTime();
     return gcTime.isBefore(inputWM);
   }
@@ -208,7 +208,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
     @Override
     public void setForWindow(BoundedWindow window) {
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
       // make sure this fires after any window.maxTimestamp() timers
       gcTime = gcTime.plus(GC_DELAY_MS);
       timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
@@ -222,7 +222,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
         Instant timestamp,
         TimeDomain timeDomain) {
       boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
       gcTime = gcTime.plus(GC_DELAY_MS);
       return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 64f5d9b..13e4c43 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -365,8 +365,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
       ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Instant eow = context.window().maxTimestamp();
-    Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness());
+    Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy);
 
     if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
       WindowTracing.trace(
@@ -387,6 +386,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
       return null;
     }
 
+    if (!gcHold.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      // If the garbage collection hold is past the timestamp we can represent, instead truncate
+      // to the maximum timestamp that is not positive infinity. This ensures all windows will
+      // eventually be garbage collected.
+      gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L));
+    }
     checkState(!gcHold.isBefore(inputWM),
         "Garbage collection hold %s cannot be before input watermark %s",
         gcHold, inputWM);

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
new file mode 100644
index 0000000..f0f315d
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LateDataUtils}.
+ */
+@RunWith(JUnit4.class)
+public class LateDataUtilsTest {
+  @Test
+  public void beforeEndOfGlobalWindowSame() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
+    Duration allowedLateness = Duration.standardMinutes(2);
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault()
+            .withWindowFn(windowFn)
+            .withAllowedLateness(allowedLateness);
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(10));
+    assertThat(
+        LateDataUtils.garbageCollectionTime(window, strategy),
+        equalTo(window.maxTimestamp().plus(allowedLateness)));
+  }
+
+  @Test
+  public void garbageCollectionTimeAfterEndOfGlobalWindow() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault()
+            .withWindowFn(windowFn);
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(
+        window.maxTimestamp(),
+        Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+    assertThat(
+        LateDataUtils.garbageCollectionTime(window, strategy),
+        equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
+  }
+
+  @Test
+  public void garbageCollectionTimeAfterEndOfGlobalWindowWithLateness() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
+    Duration allowedLateness = Duration.millis(Long.MAX_VALUE);
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault()
+            .withWindowFn(windowFn)
+            .withAllowedLateness(allowedLateness);
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(-100));
+    assertThat(
+        window.maxTimestamp().plus(allowedLateness),
+        Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+    assertThat(
+        LateDataUtils.garbageCollectionTime(window, strategy),
+        equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
index 0b1ab25..990f235 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
 
 /**
  * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original
@@ -63,6 +64,11 @@ class ReifyTimestamps {
 
   private static class ExtractTimestampedValueDoFn<K, V>
       extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return Duration.millis(Long.MAX_VALUE);
+    }
+
     @ProcessElement
     public void processElement(ProcessContext context) {
       KV<K, TimestampedValue<V>> kv = context.element();

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
index 5394826..3b7122c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -71,22 +71,27 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
             .withTimestampCombiner(TimestampCombiner.EARLIEST)
             .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
-    return input.apply(rewindow)
+    return input
+        .apply(rewindow)
         .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
         .apply(GroupByKey.<K, TimestampedValue<V>>create())
         // Set the windowing strategy directly, so that it doesn't get counted as the user having
         // set allowed lateness.
         .setWindowingStrategyInternal(originalStrategy)
-        .apply("ExpandIterable", ParDo.of(
-            new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                K key = c.element().getKey();
-                for (TimestampedValue<V> value : c.element().getValue()) {
-                  c.output(KV.of(key, value));
-                }
-              }
-            }))
-        .apply("RestoreOriginalTimestamps", ReifyTimestamps.<K, V>extractFromValues());
+        .apply(
+            "ExpandIterable",
+            ParDo.of(
+                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    K key = c.element().getKey();
+                    for (TimestampedValue<V> value : c.element().getValue()) {
+                      c.output(KV.of(key, value));
+                    }
+                  }
+                }))
+        .apply(
+            "RestoreOriginalTimestamps",
+            ReifyTimestamps.<K, V>extractFromValues());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
index 181433e..e872842 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
@@ -101,4 +101,40 @@ public class ReifyTimestampsTest implements Serializable {
 
     pipeline.run();
   }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void extractFromValuesWhenValueTimestampedLaterSucceeds() {
+    PCollection<KV<String, TimestampedValue<Integer>>> preified =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of(
+                    KV.of("foo", TimestampedValue.of(0, new Instant((0)))), new Instant(100)),
+                TimestampedValue.of(
+                    KV.of("foo", TimestampedValue.of(1, new Instant(1))), new Instant(101L)),
+                TimestampedValue.of(
+                    KV.of("bar", TimestampedValue.of(2, new Instant(2))), new Instant(102L)),
+                TimestampedValue.of(
+                    KV.of("baz", TimestampedValue.of(3, new Instant(3))), new Instant(103L))));
+
+    PCollection<KV<String, Integer>> timestamped =
+        preified.apply(ReifyTimestamps.<String, Integer>extractFromValues());
+
+    PAssert.that(timestamped)
+        .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3));
+
+    timestamped.apply(
+        "AssertElementTimestamps",
+        ParDo.of(
+            new DoFn<KV<String, Integer>, Void>() {
+              @ProcessElement
+              public void verifyTimestampsEqualValue(ProcessContext context) {
+                assertThat(
+                    new Instant(context.element().getValue().longValue()),
+                    equalTo(context.timestamp()));
+              }
+            }));
+
+    pipeline.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/608629ad/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index 1038fd6..3cd7cf9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -27,8 +27,11 @@ import java.util.List;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -263,4 +266,28 @@ public class ReshuffleTest implements Serializable {
 
     pipeline.run();
   }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testReshuffleWithTimestampsStreaming() {
+    TestStream<Long> stream =
+        TestStream.create(VarLongCoder.of())
+            .advanceWatermarkTo(new Instant(0L).plus(Duration.standardDays(48L)))
+            .addElements(
+                TimestampedValue.of(0L, new Instant(0L)),
+                TimestampedValue.of(1L, new Instant(0L).plus(Duration.standardDays(48L))),
+                TimestampedValue.of(
+                    2L, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(48L))))
+            .advanceWatermarkToInfinity();
+    PCollection<KV<String, Long>> input =
+        pipeline
+            .apply(stream).apply(WithKeys.<String, Long>of(""))
+            .apply(
+                Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(10L))));
+
+    PCollection<KV<String, Long>> reshuffled = input.apply(Reshuffle.<String, Long>of());
+    PAssert.that(reshuffled.apply(Values.<Long>create())).containsInAnyOrder(0L, 1L, 2L);
+
+    pipeline.run();
+  }
 }


[2/2] beam git commit: This closes #3081

Posted by dh...@apache.org.
This closes #3081


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74bcc023
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74bcc023
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74bcc023

Branch: refs/heads/release-2.0.0
Commit: 74bcc02377ed85ba95efe3c9c65d1cb2afdd053c
Parents: e8e8a6e 608629a
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 11 15:12:10 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 11 15:12:10 2017 -0700

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        |  2 +-
 .../apache/beam/runners/core/LateDataUtils.java | 33 ++++++-
 .../beam/runners/core/ReduceFnRunner.java       | 43 +++-------
 .../beam/runners/core/SimpleDoFnRunner.java     |  2 +-
 .../beam/runners/core/StatefulDoFnRunner.java   |  6 +-
 .../apache/beam/runners/core/WatermarkHold.java |  9 +-
 .../beam/runners/core/LateDataUtilsTest.java    | 90 ++++++++++++++++++++
 .../beam/sdk/transforms/ReifyTimestamps.java    |  6 ++
 .../apache/beam/sdk/transforms/Reshuffle.java   | 29 ++++---
 .../sdk/transforms/ReifyTimestampsTest.java     | 36 ++++++++
 .../beam/sdk/transforms/ReshuffleTest.java      | 27 ++++++
 11 files changed, 234 insertions(+), 49 deletions(-)
----------------------------------------------------------------------