You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/22 01:04:39 UTC

[1/2] beam git commit: Fix Output Windows in OnTimerContext

Repository: beam
Updated Branches:
  refs/heads/master 7e97820c5 -> e1dc7a861


Fix Output Windows in OnTimerContext

When a User timer is delivered, output elements produced by that timer
firing should be placed within the same window as the timer is in.

Deliver timers in the window of thier namespace in the DirectRunner.

Test that timer deliveries maintain the window the timer was emitted in.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/75100f8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/75100f8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/75100f8b

Branch: refs/heads/master
Commit: 75100f8bddb957522b4ce1a9e3cc2a4d60b2527c
Parents: 7e97820
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 21 11:26:09 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 21 18:03:51 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 13 ++++--
 .../direct/StatefulParDoEvaluatorFactory.java   | 17 +++++---
 .../apache/beam/sdk/transforms/ParDoTest.java   | 45 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 2b93ca0..f5a559c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -766,22 +767,26 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
 
     @Override
     public void output(OutputT output) {
-      context.outputWithTimestamp(output, timestamp);
+      context.outputWindowedValue(
+          output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
+      context.outputWindowedValue(
+          output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 0ad40ac..77bebb2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -228,15 +230,20 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
     @Override
     public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult)
         throws Exception {
-
-      BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows());
-
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
 
       for (TimerData timer : gbkResult.getValue().timersIterable()) {
-        delegateEvaluator.onTimer(timer, window);
+        checkState(
+            timer.getNamespace() instanceof WindowNamespace,
+            "Expected Timer %s to be in a %s, but got %s",
+            timer,
+            WindowNamespace.class.getSimpleName(),
+            timer.getNamespace().getClass().getName());
+        WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
+        BoundedWindow timerWindow = windowNamespace.getWindow();
+        delegateEvaluator.onTimer(timer, timerWindow);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e58f78e..d5786f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -101,6 +101,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -1952,6 +1953,50 @@ public class ParDoTest implements Serializable {
     pipeline.run();
   }
 
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testTimerReceivedInOriginalWindow() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, BoundedWindow> fn =
+        new DoFn<KV<String, Integer>, BoundedWindow>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.setForNowPlus(Duration.standardSeconds(1));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context, BoundedWindow window) {
+            context.output(context.window());
+          }
+
+          public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() {
+            return (TypeDescriptor) TypeDescriptor.of(IntervalWindow.class);
+          }
+        };
+
+    SlidingWindows windowing =
+        SlidingWindows.of(Duration.standardMinutes(3)).every(Duration.standardMinutes(1));
+    PCollection<BoundedWindow> output =
+        pipeline
+            .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 24), new Instant(0L))))
+            .apply(Window.<KV<String, Integer>>into(windowing))
+            .apply(ParDo.of(fn));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            new IntervalWindow(new Instant(0), Duration.standardMinutes(3)),
+            new IntervalWindow(
+                new Instant(0).minus(Duration.standardMinutes(1)), Duration.standardMinutes(3)),
+            new IntervalWindow(
+                new Instant(0).minus(Duration.standardMinutes(2)), Duration.standardMinutes(3)));
+    pipeline.run();
+  }
+
   /**
    * Tests that an event time timer set absolutely for the last possible moment fires and results in
    * supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.


[2/2] beam git commit: This closes #2285

Posted by tg...@apache.org.
This closes #2285


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1dc7a86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1dc7a86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1dc7a86

Branch: refs/heads/master
Commit: e1dc7a861bc53432430e65d962241bb1ed7098ca
Parents: 7e97820 75100f8
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 21 18:04:23 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 21 18:04:23 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 13 ++++--
 .../direct/StatefulParDoEvaluatorFactory.java   | 17 +++++---
 .../apache/beam/sdk/transforms/ParDoTest.java   | 45 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 9 deletions(-)
----------------------------------------------------------------------