You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 01:12:25 UTC

[1/7] beam git commit: Extracts interface from PushbackSideInputDoFnRunner

Repository: beam
Updated Branches:
  refs/heads/master a9bcc8b15 -> e0df7d85e


Extracts interface from PushbackSideInputDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e1a2675
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e1a2675
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e1a2675

Branch: refs/heads/master
Commit: 7e1a2675699ef14291e8c112010be66fff4b8581
Parents: 1cc16b0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 14:41:53 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   3 +-
 .../core/PushbackSideInputDoFnRunner.java       | 106 +------
 .../core/SimplePushbackSideInputDoFnRunner.java | 115 ++++++++
 .../core/PushbackSideInputDoFnRunnerTest.java   | 282 -------------------
 .../SimplePushbackSideInputDoFnRunnerTest.java  | 282 +++++++++++++++++++
 .../beam/runners/direct/ParDoEvaluator.java     |   3 +-
 .../wrappers/streaming/DoFnOperator.java        |  12 +-
 .../streaming/SplittableDoFnOperator.java       |   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 9 files changed, 424 insertions(+), 383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index bad5be2..52d1d43 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -48,6 +48,7 @@ import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
@@ -368,7 +369,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
     }
 
     pushbackDoFnRunner =
-        PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+        SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
 
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 4ad20b5..bab1dc7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -17,113 +17,35 @@
  */
 package org.apache.beam.runners.core;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
 
 /**
- * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
- * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ * Interface for runners of {@link DoFn}'s that support pushback when reading side inputs,
+ * i.e. return elements that could not be processed because they require reading a side input
+ * window that is not ready.
  */
-public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-  private final DoFnRunner<InputT, OutputT> underlying;
-  private final Collection<PCollectionView<?>> views;
-  private final ReadyCheckingSideInputReader sideInputReader;
-
-  private Set<BoundedWindow> notReadyWindows;
-
-  public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
-      DoFnRunner<InputT, OutputT> underlying,
-      Collection<PCollectionView<?>> views,
-      ReadyCheckingSideInputReader sideInputReader) {
-    return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
-  }
-
-  private PushbackSideInputDoFnRunner(
-      DoFnRunner<InputT, OutputT> underlying,
-      Collection<PCollectionView<?>> views,
-      ReadyCheckingSideInputReader sideInputReader) {
-    this.underlying = underlying;
-    this.views = views;
-    this.sideInputReader = sideInputReader;
-  }
-
-  @Override
-  public void startBundle() {
-    notReadyWindows = new HashSet<>();
-    underlying.startBundle();
-  }
+public interface PushbackSideInputDoFnRunner<InputT, OutputT> {
+  /** Calls the underlying {@link DoFn.StartBundle} method. */
+  void startBundle();
 
   /**
-   * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
+   * Call the underlying {@link DoFn.ProcessElement} method for the provided element
    * for each window the element is in that is ready.
    *
    * @param elem the element to process in all ready windows
    * @return each element that could not be processed because it requires a side input window
    * that is not ready.
    */
-  public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
-    if (views.isEmpty()) {
-      // When there are no side inputs, we can preserve the compressed representation.
-      processElement(elem);
-      return Collections.emptyList();
-    }
-    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
-    for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
-      BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
-      if (isReady(mainInputWindow)) {
-        // When there are any side inputs, we have to process the element in each window
-        // individually, to disambiguate access to per-window side inputs.
-        processElement(windowElem);
-      } else {
-        notReadyWindows.add(mainInputWindow);
-        pushedBack.add(windowElem);
-      }
-    }
-    return pushedBack.build();
-  }
-
-  private boolean isReady(BoundedWindow mainInputWindow) {
-    if (notReadyWindows.contains(mainInputWindow)) {
-      return false;
-    }
-    for (PCollectionView<?> view : views) {
-      BoundedWindow sideInputWindow =
-          view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
-      if (!sideInputReader.isReady(view, sideInputWindow)) {
-        return false;
-      }
-    }
-    return true;
-  }
+  Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem);
 
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    underlying.processElement(elem);
-  }
+  /** Calls the underlying {@link DoFn.OnTimer} method. */
+  void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+               TimeDomain timeDomain);
 
-  @Override
-  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
-      TimeDomain timeDomain) {
-    underlying.onTimer(timerId, window, timestamp, timeDomain);
-  }
-
-  /**
-   * Call the underlying {@link DoFnRunner#finishBundle()}.
-   */
-  @Override
-  public void finishBundle() {
-    notReadyWindows = null;
-    underlying.finishBundle();
-  }
+  /** Calls the underlying {@link DoFn.FinishBundle} method. */
+  void finishBundle();
 }
-

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..50d301b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
+    implements PushbackSideInputDoFnRunner<InputT, OutputT> {
+  private final DoFnRunner<InputT, OutputT> underlying;
+  private final Collection<PCollectionView<?>> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  private Set<BoundedWindow> notReadyWindows;
+
+  public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    return new SimplePushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+  }
+
+  private SimplePushbackSideInputDoFnRunner(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    this.underlying = underlying;
+    this.views = views;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+    notReadyWindows = new HashSet<>();
+    underlying.startBundle();
+  }
+
+  @Override
+  public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+    if (views.isEmpty()) {
+      // When there are no side inputs, we can preserve the compressed representation.
+      underlying.processElement(elem);
+      return Collections.emptyList();
+    }
+    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+    for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+      BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+      if (isReady(mainInputWindow)) {
+        // When there are any side inputs, we have to process the element in each window
+        // individually, to disambiguate access to per-window side inputs.
+        underlying.processElement(windowElem);
+      } else {
+        notReadyWindows.add(mainInputWindow);
+        pushedBack.add(windowElem);
+      }
+    }
+    return pushedBack.build();
+  }
+
+  private boolean isReady(BoundedWindow mainInputWindow) {
+    if (notReadyWindows.contains(mainInputWindow)) {
+      return false;
+    }
+    for (PCollectionView<?> view : views) {
+      BoundedWindow sideInputWindow =
+          view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+      if (!sideInputReader.isReady(view, sideInputWindow)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+                      TimeDomain timeDomain) {
+    underlying.onTimer(timerId, window, timestamp, timeDomain);
+  }
+
+  @Override
+  public void finishBundle() {
+    notReadyWindows = null;
+    underlying.finishBundle();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
deleted file mode 100644
index cb057b8..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link PushbackSideInputDoFnRunner}.
- */
-@RunWith(JUnit4.class)
-public class PushbackSideInputDoFnRunnerTest {
-  @Mock private ReadyCheckingSideInputReader reader;
-  private TestDoFnRunner<Integer, Integer> underlying;
-  private PCollectionView<Integer> singletonView;
-
-  @Rule
-  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    singletonView =
-        created
-            .apply(Window.into(new IdentitySideInputWindowFn()))
-            .apply(Sum.integersGlobally().asSingletonView());
-
-    underlying = new TestDoFnRunner<>();
-  }
-
-  private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
-      ImmutableList<PCollectionView<?>> views) {
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        PushbackSideInputDoFnRunner.create(underlying, views, reader);
-    runner.startBundle();
-    return runner;
-  }
-
-  @Test
-  public void startFinishBundleDelegates() {
-    PushbackSideInputDoFnRunner runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    assertThat(underlying.started, is(true));
-    assertThat(underlying.finished, is(false));
-    runner.finishBundle();
-    assertThat(underlying.finished, is(true));
-  }
-
-  @Test
-  public void processElementSideInputNotReady() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
-        .thenReturn(false);
-
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    WindowedValue<Integer> oneWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            new IntervalWindow(new Instant(-500L), new Instant(0L)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> oneWindowPushback =
-        runner.processElementInReadyWindows(oneWindow);
-    assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
-    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
-  }
-
-  @Test
-  public void processElementSideInputNotReadyMultipleWindows() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
-        .thenReturn(false);
-
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(-500L), new Instant(0L)),
-                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
-                GlobalWindow.INSTANCE),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
-    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
-  }
-
-  @Test
-  public void processElementSideInputNotReadySomeWindows() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
-        .thenReturn(false);
-    when(
-            reader.isReady(
-                Mockito.eq(singletonView),
-                org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
-        .thenReturn(true);
-
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
-    IntervalWindow bigWindow =
-        new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
-            PaneInfo.NO_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(
-        multiWindowPushback,
-        containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
-    assertThat(
-        underlying.inputElems,
-        containsInAnyOrder(
-            WindowedValue.of(
-                2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
-            WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
-  }
-
-  @Test
-  public void processElementSideInputReadyAllWindows() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
-        .thenReturn(true);
-
-    ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
-    PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
-
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(-500L), new Instant(0L)),
-                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
-                GlobalWindow.INSTANCE),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, emptyIterable());
-    assertThat(
-        underlying.inputElems,
-        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
-  }
-
-  @Test
-  public void processElementNoSideInputs() {
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of());
-
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(-500L), new Instant(0L)),
-                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
-                GlobalWindow.INSTANCE),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, emptyIterable());
-    // Should preserve the compressed representation when there's no side inputs.
-    assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
-  }
-
-  /** Tests that a call to onTimer gets delegated. */
-  @Test
-  public void testOnTimerCalled() {
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of());
-
-    String timerId = "fooTimer";
-    IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
-    Instant timestamp = new Instant(72);
-
-    // Mocking is not easily compatible with annotation analysis, so we manually record
-    // the method call.
-    runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
-
-    assertThat(
-        underlying.firedTimers,
-        contains(
-            TimerData.of(
-                timerId,
-                StateNamespaces.window(IntervalWindow.getCoder(), window),
-                timestamp,
-                TimeDomain.EVENT_TIME)));
-  }
-
-  private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-    List<WindowedValue<InputT>> inputElems;
-    List<TimerData> firedTimers;
-    private boolean started = false;
-    private boolean finished = false;
-
-    @Override
-    public void startBundle() {
-      started = true;
-      inputElems = new ArrayList<>();
-      firedTimers = new ArrayList<>();
-    }
-
-    @Override
-    public void processElement(WindowedValue<InputT> elem) {
-      inputElems.add(elem);
-    }
-
-    @Override
-    public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
-        TimeDomain timeDomain) {
-      firedTimers.add(
-          TimerData.of(
-              timerId,
-              StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
-              timestamp,
-              timeDomain));
-    }
-
-    @Override
-    public void finishBundle() {
-      finished = true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..ba3f926
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link SimplePushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class SimplePushbackSideInputDoFnRunnerTest {
+  @Mock private ReadyCheckingSideInputReader reader;
+  private TestDoFnRunner<Integer, Integer> underlying;
+  private PCollectionView<Integer> singletonView;
+
+  @Rule
+  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    singletonView =
+        created
+            .apply(Window.into(new IdentitySideInputWindowFn()))
+            .apply(Sum.integersGlobally().asSingletonView());
+
+    underlying = new TestDoFnRunner<>();
+  }
+
+  private SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+      ImmutableList<PCollectionView<?>> views) {
+    SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+        SimplePushbackSideInputDoFnRunner.create(underlying, views, reader);
+    runner.startBundle();
+    return runner;
+  }
+
+  @Test
+  public void startFinishBundleDelegates() {
+    PushbackSideInputDoFnRunner runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    assertThat(underlying.started, is(true));
+    assertThat(underlying.finished, is(false));
+    runner.finishBundle();
+    assertThat(underlying.finished, is(true));
+  }
+
+  @Test
+  public void processElementSideInputNotReady() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> oneWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            new IntervalWindow(new Instant(-500L), new Instant(0L)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> oneWindowPushback =
+        runner.processElementInReadyWindows(oneWindow);
+    assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadyMultipleWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadySomeWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+        .thenReturn(false);
+    when(
+            reader.isReady(
+                Mockito.eq(singletonView),
+                org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+        .thenReturn(true);
+
+    SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+    IntervalWindow bigWindow =
+        new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(
+        multiWindowPushback,
+        containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+    assertThat(
+        underlying.inputElems,
+        containsInAnyOrder(
+            WindowedValue.of(
+                2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
+            WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
+  }
+
+  @Test
+  public void processElementSideInputReadyAllWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(true);
+
+    ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+    SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    assertThat(
+        underlying.inputElems,
+        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+  }
+
+  @Test
+  public void processElementNoSideInputs() {
+    SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of());
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    // Should preserve the compressed representation when there's no side inputs.
+    assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+  }
+
+  /** Tests that a call to onTimer gets delegated. */
+  @Test
+  public void testOnTimerCalled() {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of());
+
+    String timerId = "fooTimer";
+    IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
+    Instant timestamp = new Instant(72);
+
+    // Mocking is not easily compatible with annotation analysis, so we manually record
+    // the method call.
+    runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
+
+    assertThat(
+        underlying.firedTimers,
+        contains(
+            TimerData.of(
+                timerId,
+                StateNamespaces.window(IntervalWindow.getCoder(), window),
+                timestamp,
+                TimeDomain.EVENT_TIME)));
+  }
+
+  private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+    List<WindowedValue<InputT>> inputElems;
+    List<TimerData> firedTimers;
+    private boolean started = false;
+    private boolean finished = false;
+
+    @Override
+    public void startBundle() {
+      started = true;
+      inputElems = new ArrayList<>();
+      firedTimers = new ArrayList<>();
+    }
+
+    @Override
+    public void processElement(WindowedValue<InputT> elem) {
+      inputElems.add(elem);
+    }
+
+    @Override
+    public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+        TimeDomain timeDomain) {
+      firedTimers.add(
+          TimerData.of(
+              timerId,
+              StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
+              timestamp,
+              timeDomain));
+    }
+
+    @Override
+    public void finishBundle() {
+      finished = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 131716f..bab7b2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -85,7 +86,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
             aggregatorChanges,
             windowingStrategy);
     PushbackSideInputDoFnRunner<InputT, OutputT> runner =
-        PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+        SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
 
     try {
       runner.startBundle();

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 5496f71..8a09286 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -119,6 +120,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected final OutputManagerFactory<OutputT> outputManagerFactory;
 
+  protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
   protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
 
   protected transient SideInputHandler sideInputHandler;
@@ -269,7 +271,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     ExecutionContext.StepContext stepContext = createStepContext();
 
-    DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
+    doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(),
         doFn,
         sideInputReader,
@@ -320,7 +322,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
 
     pushbackDoFnRunner =
-        PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+        SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
   }
 
   @Override
@@ -362,9 +364,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   @Override
   public final void processElement(
       StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
-    pushbackDoFnRunner.startBundle();
-    pushbackDoFnRunner.processElement(streamRecord.getValue());
-    pushbackDoFnRunner.finishBundle();
+    doFnRunner.startBundle();
+    doFnRunner.processElement(streamRecord.getValue());
+    doFnRunner.finishBundle();
   }
 
   private void setPushedBackWatermark(long watermark) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 1a636c9..40f70e4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -142,7 +142,7 @@ public class SplittableDoFnOperator<
 
   @Override
   public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
-    pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+    doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
         KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
             (String) stateInternals.getKey(),
             Collections.singletonList(timer.getNamespace()))));

http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 7b899f4..9b2136c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -108,7 +108,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
   @Override
   public void fireTimer(InternalTimer<?, TimerData> timer) {
-    pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+    doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
         KeyedWorkItems.<K, InputT>timersWorkItem(
             (K) stateInternals.getKey(),
             Collections.singletonList(timer.getNamespace()))));


[7/7] beam git commit: This closes #2556

Posted by jk...@apache.org.
This closes #2556


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0df7d85
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0df7d85
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0df7d85

Branch: refs/heads/master
Commit: e0df7d85e80eac71f875663512bc293a0529460f
Parents: a9bcc8b 6ac3ac5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Apr 18 18:02:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:25 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   3 +-
 .../apache/beam/runners/core/DoFnRunners.java   |  32 +++
 .../beam/runners/core/ProcessFnRunner.java      | 127 +++++++++
 .../core/PushbackSideInputDoFnRunner.java       | 106 +------
 .../core/SimplePushbackSideInputDoFnRunner.java | 115 ++++++++
 .../beam/runners/core/SplittableParDo.java      | 110 +++++---
 .../core/PushbackSideInputDoFnRunnerTest.java   | 282 -------------------
 .../SimplePushbackSideInputDoFnRunnerTest.java  | 282 +++++++++++++++++++
 .../beam/runners/core/SplittableParDoTest.java  |  90 +++---
 ...ecycleManagerRemovingTransformEvaluator.java |   6 +-
 .../beam/runners/direct/ParDoEvaluator.java     | 127 ++++++---
 .../runners/direct/ParDoEvaluatorFactory.java   |  13 +-
 ...littableProcessElementsEvaluatorFactory.java | 106 +++++--
 .../direct/StatefulParDoEvaluatorFactory.java   |   4 +-
 .../direct/TransformEvaluatorRegistry.java      |   4 +-
 ...leManagerRemovingTransformEvaluatorTest.java |   8 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   7 +-
 runners/flink/runner/pom.xml                    |   3 +-
 .../wrappers/streaming/DoFnOperator.java        |  12 +-
 .../streaming/SplittableDoFnOperator.java       |   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 ...esSplittableParDoWithWindowedSideInputs.java |  26 ++
 .../beam/sdk/transforms/SplittableDoFnTest.java | 104 +++++--
 23 files changed, 993 insertions(+), 578 deletions(-)
----------------------------------------------------------------------



[3/7] beam git commit: Separates side input test and side output test

Posted by jk...@apache.org.
Separates side input test and side output test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a51bdd26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a51bdd26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a51bdd26

Branch: refs/heads/master
Commit: a51bdd266f9c877cb407de986a465fc9c7de76ff
Parents: a9bcc8b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 15 16:38:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/SplittableDoFnTest.java | 63 ++++++++++++++------
 1 file changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a51bdd26/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 9e8c12e..30329f4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -61,7 +62,7 @@ import org.junit.runners.JUnit4;
  * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior.
  */
 @RunWith(JUnit4.class)
-public class SplittableDoFnTest {
+public class SplittableDoFnTest implements Serializable {
 
   static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
     @ProcessElement
@@ -216,22 +217,18 @@ public class SplittableDoFnTest {
     p.run();
   }
 
-  private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
+  private static class SDFWithSideInput extends DoFn<Integer, String> {
     private final PCollectionView<String> sideInput;
-    private final TupleTag<String> additionalOutput;
 
-    private SDFWithSideInputsAndOutputs(
-        PCollectionView<String> sideInput, TupleTag<String> additionalOutput) {
+    private SDFWithSideInput(PCollectionView<String> sideInput) {
       this.sideInput = sideInput;
-      this.additionalOutput = additionalOutput;
     }
 
     @ProcessElement
     public void process(ProcessContext c, OffsetRangeTracker tracker) {
       checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
       String side = c.sideInput(sideInput);
-      c.output("main:" + side + ":" + c.element());
-      c.output(additionalOutput, "additional:" + side + ":" + c.element());
+      c.output(side + ":" + c.element());
     }
 
     @GetInitialRestriction
@@ -242,27 +239,55 @@ public class SplittableDoFnTest {
 
   @Test
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})
-  public void testSideInputsAndOutputs() throws Exception {
-
+  public void testSideInput() throws Exception {
     PCollectionView<String> sideInput =
         p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
-    TupleTag<String> mainOutputTag = new TupleTag<>("main");
-    TupleTag<String> additionalOutputTag = new TupleTag<>("additional");
+
+    PCollection<String> res =
+        p.apply("input", Create.of(0, 1, 2))
+            .apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput));
+
+    PAssert.that(res).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2"));
+
+    p.run();
+  }
+
+  private static class SDFWithAdditionalOutput extends DoFn<Integer, String> {
+    private final TupleTag<String> additionalOutput;
+
+    private SDFWithAdditionalOutput(TupleTag<String> additionalOutput) {
+      this.additionalOutput = additionalOutput;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
+      c.output("main:" + c.element());
+      c.output(additionalOutput, "additional:" + c.element());
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(Integer value) {
+      return new OffsetRange(0, 1);
+    }
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
+  public void testAdditionalOutput() throws Exception {
+    TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
+    TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {};
 
     PCollectionTuple res =
         p.apply("input", Create.of(0, 1, 2))
             .apply(
-                ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag))
-                    .withSideInputs(sideInput)
+                ParDo.of(new SDFWithAdditionalOutput(additionalOutputTag))
                     .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
-    res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
-    res.get(additionalOutputTag).setCoder(StringUtf8Coder.of());
 
     PAssert.that(res.get(mainOutputTag))
-        .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
+        .containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
     PAssert.that(res.get(additionalOutputTag))
-        .containsInAnyOrder(
-            Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2"));
+        .containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));
 
     p.run();
   }


[5/7] beam git commit: Creates ProcessFnRunner and wires it through ParDoEvaluator

Posted by jk...@apache.org.
Creates ProcessFnRunner and wires it through ParDoEvaluator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b93de58f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b93de58f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b93de58f

Branch: refs/heads/master
Commit: b93de58f5a3a10877997815a793725cb0e53cc2d
Parents: 7e1a267
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 14:52:23 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunners.java   |  32 +++++
 .../beam/runners/core/ProcessFnRunner.java      | 127 +++++++++++++++++++
 .../beam/runners/direct/ParDoEvaluator.java     | 114 +++++++++++++----
 .../runners/direct/ParDoEvaluatorFactory.java   |  11 +-
 ...littableProcessElementsEvaluatorFactory.java | 106 ++++++++++++----
 .../direct/StatefulParDoEvaluatorFactory.java   |   4 +-
 .../direct/TransformEvaluatorRegistry.java      |   4 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   3 +-
 8 files changed, 341 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index b09ee08..8501e72 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.runners.core;
 
+import java.util.Collection;
 import java.util.List;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -26,10 +28,12 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
@@ -146,4 +150,32 @@ public class DoFnRunners {
         stateCleaner,
         droppedDueToLateness);
   }
+
+  public static <InputT, OutputT, RestrictionT>
+  ProcessFnRunner<InputT, OutputT, RestrictionT>
+  newProcessFnRunner(
+      ProcessFn<InputT, OutputT, RestrictionT, ?> fn,
+      PipelineOptions options,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      StepContext stepContext,
+      AggregatorFactory aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return new ProcessFnRunner<>(
+        simpleRunner(
+            options,
+            fn,
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            stepContext,
+            aggregatorFactory,
+            windowingStrategy),
+        views,
+        sideInputReader);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
new file mode 100644
index 0000000..3ae3f50
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.core.SplittableParDo.ProcessFn;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
+
+/** Runs a {@link ProcessFn} by constructing the appropriate contexts and passing them in. */
+public class ProcessFnRunner<InputT, OutputT, RestrictionT>
+    implements PushbackSideInputDoFnRunner<
+        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+  private final DoFnRunner<
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+      underlying;
+  private final Collection<PCollectionView<?>> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  ProcessFnRunner(
+      DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+          underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    this.underlying = underlying;
+    this.views = views;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+    underlying.startBundle();
+  }
+
+  @Override
+  public Iterable<WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>>
+      processElementInReadyWindows(
+          WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+              windowedKWI) {
+    checkTrivialOuterWindows(windowedKWI);
+    BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue());
+    if (!isReady(window)) {
+      return Collections.singletonList(windowedKWI);
+    }
+    underlying.processElement(windowedKWI);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void finishBundle() {
+    underlying.finishBundle();
+  }
+
+  @Override
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
+  }
+
+  private static <T> void checkTrivialOuterWindows(
+      WindowedValue<KeyedWorkItem<String, T>> windowedKWI) {
+    // In practice it will be in 0 or 1 windows (ValueInEmptyWindows or ValueInGlobalWindow)
+    Collection<? extends BoundedWindow> outerWindows = windowedKWI.getWindows();
+    if (!outerWindows.isEmpty()) {
+      checkArgument(
+          outerWindows.size() == 1,
+          "The KeyedWorkItem itself must not be in multiple windows, but was in: %s",
+          outerWindows);
+      BoundedWindow onlyWindow = Iterables.getOnlyElement(outerWindows);
+      checkArgument(
+          onlyWindow instanceof GlobalWindow,
+          "KeyedWorkItem must be in the Global window, but was in: %s",
+          onlyWindow);
+    }
+  }
+
+  private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<String, T> kwi) {
+    if (Iterables.isEmpty(kwi.elementsIterable())) {
+      // ProcessFn sets only a single timer.
+      TimerData timer = Iterables.getOnlyElement(kwi.timersIterable());
+      return ((WindowNamespace) timer.getNamespace()).getWindow();
+    } else {
+      // KWI must have a single element in elementsIterable, because it follows a GBK by a
+      // uniquely generated key.
+      // Additionally, windows must be exploded before GBKIntoKeyedWorkItems, so there's also
+      // only a single window.
+      WindowedValue<T> value = Iterables.getOnlyElement(kwi.elementsIterable());
+      return Iterables.getOnlyElement(value.getWindows());
+    }
+  }
+
+  private boolean isReady(BoundedWindow mainInputWindow) {
+    for (PCollectionView<?> view : views) {
+      BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+      if (!sideInputReader.isReady(view, sideInputWindow)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index bab7b2c..cab11db 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -43,6 +44,50 @@ import org.apache.beam.sdk.values.TupleTag;
 
 class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
 
+  public interface DoFnRunnerFactory<InputT, OutputT> {
+    PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+        PipelineOptions options,
+        DoFn<InputT, OutputT> fn,
+        List<PCollectionView<?>> sideInputs,
+        ReadyCheckingSideInputReader sideInputReader,
+        OutputManager outputManager,
+        TupleTag<OutputT> mainOutputTag,
+        List<TupleTag<?>> additionalOutputTags,
+        DirectStepContext stepContext,
+        AggregatorContainer.Mutator aggregatorChanges,
+        WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy);
+  }
+
+  public static <InputT, OutputT> DoFnRunnerFactory<InputT, OutputT> defaultRunnerFactory() {
+    return new DoFnRunnerFactory<InputT, OutputT>() {
+      @Override
+      public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+          PipelineOptions options,
+          DoFn<InputT, OutputT> fn,
+          List<PCollectionView<?>> sideInputs,
+          ReadyCheckingSideInputReader sideInputReader,
+          OutputManager outputManager,
+          TupleTag<OutputT> mainOutputTag,
+          List<TupleTag<?>> additionalOutputTags,
+          DirectStepContext stepContext,
+          AggregatorContainer.Mutator aggregatorChanges,
+          WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+        DoFnRunner<InputT, OutputT> underlying =
+            DoFnRunners.simpleRunner(
+                options,
+                fn,
+                sideInputReader,
+                outputManager,
+                mainOutputTag,
+                additionalOutputTags,
+                stepContext,
+                aggregatorChanges,
+                windowingStrategy);
+        return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+      }
+    };
+  }
+
   public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
@@ -53,9 +98,43 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
-      Map<TupleTag<?>, PCollection<?>> outputs) {
+      Map<TupleTag<?>, PCollection<?>> outputs,
+      DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
     AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
 
+    BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs);
+
+    ReadyCheckingSideInputReader sideInputReader =
+        evaluationContext.createSideInputReader(sideInputs);
+
+    PushbackSideInputDoFnRunner<InputT, OutputT> runner = runnerFactory.createRunner(
+        evaluationContext.getPipelineOptions(),
+        fn,
+        sideInputs,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        additionalOutputTags,
+        stepContext,
+        aggregatorChanges,
+        windowingStrategy);
+
+    return create(runner, stepContext, application, aggregatorChanges, outputManager);
+  }
+
+  public static <InputT, OutputT> ParDoEvaluator<InputT> create(
+      PushbackSideInputDoFnRunner<InputT, OutputT> runner,
+      DirectStepContext stepContext,
+      AppliedPTransform<?, ?, ?> application,
+      AggregatorContainer.Mutator aggregatorChanges,
+      BundleOutputManager outputManager) {
+    return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+  }
+
+  static BundleOutputManager createOutputManager(
+      EvaluationContext evaluationContext,
+      StructuralKey<?> key,
+      Map<TupleTag<?>, PCollection<?>> outputs) {
     Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
     for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
       // Just trust the context's decision as to whether the output should be keyed.
@@ -69,32 +148,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
             outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
       }
     }
-    BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
-
-    ReadyCheckingSideInputReader sideInputReader =
-        evaluationContext.createSideInputReader(sideInputs);
-
-    DoFnRunner<InputT, OutputT> underlying =
-        DoFnRunners.simpleRunner(
-            evaluationContext.getPipelineOptions(),
-            fn,
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            additionalOutputTags,
-            stepContext,
-            aggregatorChanges,
-            windowingStrategy);
-    PushbackSideInputDoFnRunner<InputT, OutputT> runner =
-        SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
-
-    try {
-      runner.startBundle();
-    } catch (Exception e) {
-      throw UserCodeException.wrap(e);
-    }
-
-    return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+    return BundleOutputManager.create(outputBundles);
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
@@ -119,6 +173,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
     this.stepContext = stepContext;
     this.aggregatorChanges = aggregatorChanges;
     this.unprocessedElements = ImmutableList.builder();
+
+    try {
+      fnRunner.startBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
   }
 
   public BundleOutputManager getOutputManager() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 93f204a..b00c2b6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -43,9 +43,13 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
   private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
   private final EvaluationContext evaluationContext;
+  private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;
 
-  ParDoEvaluatorFactory(EvaluationContext evaluationContext) {
+  ParDoEvaluatorFactory(
+      EvaluationContext evaluationContext,
+      ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
     this.evaluationContext = evaluationContext;
+    this.runnerFactory = runnerFactory;
     fnClones =
         CacheBuilder.newBuilder()
             .build(
@@ -148,7 +152,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           sideInputs,
           mainOutputTag,
           additionalOutputTags,
-          pcollections(application.getOutputs()));
+          pcollections(application.getOutputs()),
+          runnerFactory);
     } catch (Exception e) {
       try {
         fnManager.remove();
@@ -162,7 +167,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     }
   }
 
-  private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
+  static Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
     Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>();
     for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
       pcs.put(output.getKey(), (PCollection<?>) output.getValue());

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 00b16dd..7efdb52 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -18,25 +18,34 @@
 package org.apache.beam.runners.direct;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ElementAndRestriction;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -51,7 +60,11 @@ class SplittableProcessElementsEvaluatorFactory<
 
   SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
-    this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+    this.delegateFactory =
+        new ParDoEvaluatorFactory<>(
+            evaluationContext,
+            SplittableProcessElementsEvaluatorFactory
+                .<InputT, OutputT, RestrictionT>processFnRunnerFactory());
   }
 
   @Override
@@ -82,12 +95,12 @@ class SplittableProcessElementsEvaluatorFactory<
     final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
         application.getTransform();
 
-    SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+    ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
         transform.newProcessFn(transform.getFn());
 
     DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
     processFn =
-        ((SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+        ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
             fnManager
                 .<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
                     get());
@@ -98,7 +111,7 @@ class SplittableProcessElementsEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getOrCreateStepContext(stepName, stepName);
 
-    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+    final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,
@@ -127,34 +140,36 @@ class SplittableProcessElementsEvaluatorFactory<
           }
         });
 
-    final OutputManager outputManager = parDoEvaluator.getOutputManager();
+    OutputWindowedValue<OutputT> outputWindowedValue =
+        new OutputWindowedValue<OutputT>() {
+          private final OutputManager outputManager = parDoEvaluator.getOutputManager();
+
+          @Override
+          public void outputWindowedValue(
+              OutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            outputManager.output(
+                transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane));
+          }
+
+          @Override
+          public <AdditionalOutputT> void outputWindowedValue(
+              TupleTag<AdditionalOutputT> tag,
+              AdditionalOutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+          }
+        };
     processFn.setProcessElementInvoker(
         new OutputAndTimeBoundedSplittableProcessElementInvoker<
             InputT, OutputT, RestrictionT, TrackerT>(
             transform.getFn(),
             evaluationContext.getPipelineOptions(),
-            new OutputWindowedValue<OutputT>() {
-              @Override
-              public void outputWindowedValue(
-                  OutputT output,
-                  Instant timestamp,
-                  Collection<? extends BoundedWindow> windows,
-                  PaneInfo pane) {
-                outputManager.output(
-                    transform.getMainOutputTag(),
-                    WindowedValue.of(output, timestamp, windows, pane));
-              }
-
-              @Override
-              public <AdditionalOutputT> void outputWindowedValue(
-                  TupleTag<AdditionalOutputT> tag,
-                  AdditionalOutputT output,
-                  Instant timestamp,
-                  Collection<? extends BoundedWindow> windows,
-                  PaneInfo pane) {
-                outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
-              }
-            },
+            outputWindowedValue,
             evaluationContext.createSideInputReader(transform.getSideInputs()),
             // TODO: For better performance, use a higher-level executor?
             Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
@@ -163,4 +178,41 @@ class SplittableProcessElementsEvaluatorFactory<
 
     return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
   }
+
+  private static <InputT, OutputT, RestrictionT>
+  ParDoEvaluator.DoFnRunnerFactory<
+                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+          processFnRunnerFactory() {
+    return new ParDoEvaluator.DoFnRunnerFactory<
+            KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+      @Override
+      public PushbackSideInputDoFnRunner<
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+      createRunner(
+          PipelineOptions options,
+          DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> fn,
+          List<PCollectionView<?>> sideInputs,
+          ReadyCheckingSideInputReader sideInputReader,
+          OutputManager outputManager,
+          TupleTag<OutputT> mainOutputTag,
+          List<TupleTag<?>> additionalOutputTags,
+          DirectExecutionContext.DirectStepContext stepContext,
+          AggregatorContainer.Mutator aggregatorChanges,
+          WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+        ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
+            (ProcessFn) fn;
+        return DoFnRunners.newProcessFnRunner(
+            processFn,
+            options,
+            sideInputs,
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            stepContext,
+            aggregatorChanges,
+            windowingStrategy);
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index be77ea1..8793ae8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -65,7 +65,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
   private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
 
   StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
-    this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+    this.delegateFactory =
+        new ParDoEvaluatorFactory<>(
+            evaluationContext, ParDoEvaluator.<KV<K, InputT>, OutputT>defaultRunnerFactory());
     this.cleanupRegistry =
         CacheBuilder.newBuilder()
             .weakValues()

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index ae7ad93..d06c460 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -52,7 +52,9 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
         ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
             .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
-            .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt))
+            .put(
+                ParDo.MultiOutput.class,
+                new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory()))
             .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
             .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
             .put(WriteView.class, new ViewEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 2be0f9d..e99e4bf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -169,7 +169,8 @@ public class ParDoEvaluatorTest {
         ImmutableList.<PCollectionView<?>>of(singletonView),
         mainOutputTag,
         additionalOutputTags,
-        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
+        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output),
+        ParDoEvaluator.<Integer, Integer>defaultRunnerFactory());
   }
 
   private static class RecorderFn extends DoFn<Integer, Integer> {


[4/7] beam git commit: Minor cleanups in ParDoEvaluator

Posted by jk...@apache.org.
Minor cleanups in ParDoEvaluator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc16b0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc16b0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc16b0d

Branch: refs/heads/master
Commit: 1cc16b0d6cea7b01b01427758eaf427cc29635b6
Parents: 3fd8890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 12:25:02 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 ...oFnLifecycleManagerRemovingTransformEvaluator.java |  6 +++---
 .../apache/beam/runners/direct/ParDoEvaluator.java    | 14 +++++---------
 .../beam/runners/direct/ParDoEvaluatorFactory.java    |  2 +-
 .../SplittableProcessElementsEvaluatorFactory.java    |  2 +-
 ...ifecycleManagerRemovingTransformEvaluatorTest.java |  8 ++++----
 .../beam/runners/direct/ParDoEvaluatorTest.java       |  4 ++--
 6 files changed, 16 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index 9bcd569..e537962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory;
 class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
   private static final Logger LOG =
       LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class);
-  private final ParDoEvaluator<InputT, ?> underlying;
+  private final ParDoEvaluator<InputT> underlying;
   private final DoFnLifecycleManager lifecycleManager;
 
   public static <InputT> DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping(
-      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+      ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
     return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, lifecycleManager);
   }
 
   private DoFnLifecycleManagerRemovingTransformEvaluator(
-      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+      ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
     this.underlying = underlying;
     this.lifecycleManager = lifecycleManager;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 49d0723..131716f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -40,9 +40,9 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
-class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
+class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
 
-  public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
+  public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
       AppliedPTransform<?, ?, ?> application,
@@ -93,13 +93,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
       throw UserCodeException.wrap(e);
     }
 
-    return new ParDoEvaluator<>(
-        evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext);
+    return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
 
-  private final EvaluationContext evaluationContext;
   private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
   private final AppliedPTransform<?, ?, ?> transform;
   private final AggregatorContainer.Mutator aggregatorChanges;
@@ -109,13 +107,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
   private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
 
   private ParDoEvaluator(
-      EvaluationContext evaluationContext,
       PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
       AppliedPTransform<?, ?, ?> transform,
       AggregatorContainer.Mutator aggregatorChanges,
       BundleOutputManager outputManager,
       DirectStepContext stepContext) {
-    this.evaluationContext = evaluationContext;
     this.fnRunner = fnRunner;
     this.transform = transform;
     this.outputManager = outputManager;
@@ -153,11 +149,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
     } catch (Exception e) {
       throw UserCodeException.wrap(e);
     }
-    StepTransformResult.Builder resultBuilder;
+    StepTransformResult.Builder<InputT> resultBuilder;
     CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
     if (state != null) {
       resultBuilder =
-          StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+          StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())
               .withState(state);
     } else {
       resultBuilder = StepTransformResult.withoutHold(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 0372295..93f204a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -126,7 +126,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         fnManager);
   }
 
-  ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+  ParDoEvaluator<InputT> createParDoEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       StructuralKey<?> key,
       List<PCollectionView<?>> sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 64cef35..00b16dd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -98,7 +98,7 @@ class SplittableProcessElementsEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getOrCreateStepContext(stepName, stepName);
 
-    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index d046ce5..1ac4d6d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -53,7 +53,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void delegatesToUnderlying() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     DoFn<?, ?> original = lifecycleManager.get();
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
@@ -72,7 +72,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInProcessElement() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class));
 
     DoFn<?, ?> original = lifecycleManager.get();
@@ -91,7 +91,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInOnTimer() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class)
         .when(underlying)
         .onTimer(any(TimerData.class), any(BoundedWindow.class));
@@ -114,7 +114,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInFinishBundle() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class).when(underlying).finishBundle();
 
     DoFn<?, ?> original = lifecycleManager.get();

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 65a1248..2be0f9d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
     UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output);
     when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
-    ParDoEvaluator<Integer, Integer> evaluator =
+    ParDoEvaluator<Integer> evaluator =
         createEvaluator(singletonView, fn, output);
 
     IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
@@ -130,7 +130,7 @@ public class ParDoEvaluatorTest {
             WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L))));
   }
 
-  private ParDoEvaluator<Integer, Integer> createEvaluator(
+  private ParDoEvaluator<Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
       PCollection<Integer> output) {


[2/7] beam git commit: ProcessFn remembers more info about its application context

Posted by jk...@apache.org.
ProcessFn remembers more info about its application context


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3fd88901
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3fd88901
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3fd88901

Branch: refs/heads/master
Commit: 3fd889015afa8528801d2c35c8c9f72b944ea472
Parents: a51bdd2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 15 16:39:51 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDo.java      | 35 +++++++++++++++-----
 .../beam/runners/core/SplittableParDoTest.java  |  8 ++++-
 2 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 9cc965a..44db1f7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -115,7 +115,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             fn,
             input.getCoder(),
             restrictionCoder,
-            input.getWindowingStrategy(),
+            (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
             parDo.getSideInputs(),
             parDo.getMainOutputTag(),
             parDo.getAdditionalOutputTags()));
@@ -185,7 +185,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     private final DoFn<InputT, OutputT> fn;
     private final Coder<InputT> elementCoder;
     private final Coder<RestrictionT> restrictionCoder;
-    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final WindowingStrategy<InputT, ?> windowingStrategy;
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList additionalOutputTags;
@@ -202,7 +202,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         DoFn<InputT, OutputT> fn,
         Coder<InputT> elementCoder,
         Coder<RestrictionT> restrictionCoder,
-        WindowingStrategy<?, ?> windowingStrategy,
+        WindowingStrategy<InputT, ?> windowingStrategy,
         List<PCollectionView<?>> sideInputs,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList additionalOutputTags) {
@@ -234,7 +234,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
         DoFn<InputT, OutputT> fn) {
       return new SplittableParDo.ProcessFn<>(
-          fn, elementCoder, restrictionCoder, windowingStrategy.getWindowFn().windowCoder());
+          fn, elementCoder, restrictionCoder, windowingStrategy);
     }
 
     @Override
@@ -351,7 +351,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
 
     private final DoFn<InputT, OutputT> fn;
-    private final Coder<? extends BoundedWindow> windowCoder;
+    private final Coder<InputT> elementCoder;
+    private final Coder<RestrictionT> restrictionCoder;
+    private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
 
     private transient StateInternalsFactory<String> stateInternalsFactory;
     private transient TimerInternalsFactory<String> timerInternalsFactory;
@@ -364,11 +366,16 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         DoFn<InputT, OutputT> fn,
         Coder<InputT> elementCoder,
         Coder<RestrictionT> restrictionCoder,
-        Coder<? extends BoundedWindow> windowCoder) {
+        WindowingStrategy<InputT, ?> inputWindowingStrategy) {
       this.fn = fn;
-      this.windowCoder = windowCoder;
+      this.elementCoder = elementCoder;
+      this.restrictionCoder = restrictionCoder;
+      this.inputWindowingStrategy = inputWindowingStrategy;
       this.elementTag =
-          StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
+          StateTags.value(
+              "element",
+              WindowedValue.getFullCoder(
+                  elementCoder, inputWindowingStrategy.getWindowFn().windowCoder()));
       this.restrictionTag = StateTags.value("restriction", restrictionCoder);
     }
 
@@ -389,6 +396,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       return fn;
     }
 
+    public Coder<InputT> getElementCoder() {
+      return elementCoder;
+    }
+
+    public Coder<RestrictionT> getRestrictionCoder() {
+      return restrictionCoder;
+    }
+
+    public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
+      return inputWindowingStrategy;
+    }
+
     @Setup
     public void setup() throws Exception {
       invoker = DoFnInvokers.invokerFor(fn);

http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 2c89543..5629635 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -51,11 +51,13 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -220,9 +222,13 @@ public class SplittableParDoTest {
         int maxOutputsPerBundle,
         Duration maxBundleDuration)
         throws Exception {
+      // The exact windowing strategy doesn't matter in this test, but it should be able to
+      // encode IntervalWindow's because that's what all tests here use.
+      WindowingStrategy<InputT, BoundedWindow> windowingStrategy =
+          (WindowingStrategy) WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1)));
       final SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
           new SplittableParDo.ProcessFn<>(
-              fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
+              fn, inputCoder, restrictionCoder, windowingStrategy);
       this.tester = DoFnTester.of(processFn);
       this.timerInternals = new InMemoryTimerInternals();
       this.stateInternals = new TestInMemoryStateInternals<>("dummy");


[6/7] beam git commit: Explodes windows before GBKIKWI

Posted by jk...@apache.org.
Explodes windows before GBKIKWI

Also
* Adds a test for windowed side inputs that requires this
  behavior.
* Adds a test category for SDF with windowed side input.
  Runners should gradually implement this properly. For now
  only direct runner implements this properly.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ac3ac50
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ac3ac50
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ac3ac50

Branch: refs/heads/master
Commit: 6ac3ac50fec2eb02927c0a07ca928967cfef5652
Parents: b93de58
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 11:28:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDo.java      | 75 +++++++++---------
 .../beam/runners/core/SplittableParDoTest.java  | 82 +++++++-------------
 runners/flink/runner/pom.xml                    |  3 +-
 ...esSplittableParDoWithWindowedSideInputs.java | 26 +++++++
 .../beam/sdk/transforms/SplittableDoFnTest.java | 41 ++++++++++
 5 files changed, 137 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 44db1f7..31d89ee 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -19,10 +19,8 @@ package org.apache.beam.runners.core;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.UUID;
@@ -138,6 +136,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             .setCoder(splitCoder)
             .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
             .setCoder(splitCoder)
+            // ProcessFn requires all input elements to be in a single window and have a single
+            // element per work item. This must precede the unique keying so each key has a single
+            // associated element.
+            .apply(
+                "Explode windows",
+                ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, RestrictionT>>()))
             .apply(
                 "Assign unique key",
                 WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
@@ -158,6 +162,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   }
 
   /**
+   * A {@link DoFn} that forces each of its outputs to be in a single window, by indicating to the
+   * runner that it observes the window of its input element, so the runner is forced to apply it to
+   * each input in a single window and thus its output is also in a single window.
+   */
+  private static class ExplodeWindowsFn<InputT> extends DoFn<InputT, InputT> {
+    @ProcessElement
+    public void process(ProcessContext c, BoundedWindow window) {
+      c.output(c.element());
+    }
+  }
+
+  /**
    * Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces
    * {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers.
    *
@@ -317,6 +333,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
    * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
    * by creating a tracker for the restriction and checkpointing/resuming processing later if
    * necessary.
+   *
+   * <p>Takes {@link KeyedWorkItem} and assumes that the KeyedWorkItem contains a single element
+   * (or a single timer set by {@link ProcessFn itself}, in a single window. This is necessary
+   * because {@link ProcessFn} sets timers, and timers are namespaced to a single window and it
+   * should be the window of the input element.
+   *
+   * <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
    */
   @VisibleForTesting
   public static class ProcessFn<
@@ -441,7 +464,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       // Subsequent calls are timer firings and the element has to be retrieved from the state.
       TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
       boolean isSeedCall = (timer == null);
-      StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
+      StateNamespace stateNamespace;
+      if (isSeedCall) {
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+            Iterables.getOnlyElement(c.element().elementsIterable());
+        BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
+        stateNamespace =
+            StateNamespaces.window(
+                (Coder<BoundedWindow>) inputWindowingStrategy.getWindowFn().windowCoder(), window);
+      } else {
+        stateNamespace = timer.getNamespace();
+      }
+
       ValueState<WindowedValue<InputT>> elementState =
           stateInternals.state(stateNamespace, elementTag);
       ValueState<RestrictionT> restrictionState =
@@ -451,15 +485,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
       ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
       if (isSeedCall) {
-        // The element and restriction are available in c.element().
-        // elementsIterable() will, by construction of SplittableParDo, contain the same value
-        // potentially in several different windows. We implode this into a single WindowedValue
-        // in order to simplify the rest of the code and avoid iterating over elementsIterable()
-        // explicitly. The windows of this WindowedValue will be propagated to windows of the
-        // output. This is correct because a splittable DoFn is not allowed to inspect the window
-        // of its element.
         WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
-            implodeWindows(c.element().elementsIterable());
+            Iterables.getOnlyElement(c.element().elementsIterable());
         WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
         elementState.write(element);
         elementAndRestriction =
@@ -498,32 +525,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
               stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
     }
 
-    /**
-     * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link
-     * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies
-     * of the same value with the same timestamp, but different window sets.
-     *
-     * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique
-     * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s
-     * for the same key, that means only that the windows of that {@link ElementAndRestriction} are
-     * being delivered separately rather than all at once. It is also legal to do because splittable
-     * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full
-     * set of windows of its input to its output.
-     */
-    private static <InputT, RestrictionT>
-        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows(
-            Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) {
-      WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first =
-          Iterables.getFirst(values, null);
-      checkState(first != null, "Got a KeyedWorkItem with no elements and no timers");
-      ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder();
-      for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) {
-        windows.addAll(value.getWindows());
-      }
-      return WindowedValue.of(
-          first.getValue(), first.getTimestamp(), windows.build(), first.getPane());
-    }
-
     private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) {
       return fn.new Context() {
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 5629635..1a44453 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Executors;
@@ -194,11 +195,6 @@ public class SplittableParDoTest {
 
   // ------------------------------- Tests for ProcessFn ---------------------------------
 
-  enum WindowExplosion {
-    EXPLODE_WINDOWS,
-    DO_NOT_EXPLODE_WINDOWS
-  }
-
   /**
    * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
    * {@link DoFn.ProcessElement} calls).
@@ -293,24 +289,13 @@ public class SplittableParDoTest {
               ElementAndRestriction.of(element, restriction),
               currentProcessingTime,
               GlobalWindow.INSTANCE,
-              PaneInfo.ON_TIME_AND_ONLY_FIRING),
-          WindowExplosion.DO_NOT_EXPLODE_WINDOWS);
+              PaneInfo.ON_TIME_AND_ONLY_FIRING));
     }
 
-    void startElement(
-        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue,
-        WindowExplosion explosion)
+    void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
         throws Exception {
-      switch (explosion) {
-        case EXPLODE_WINDOWS:
-          tester.processElement(
-              KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows()));
-          break;
-        case DO_NOT_EXPLODE_WINDOWS:
-          tester.processElement(
-              KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
-          break;
-      }
+      tester.processElement(
+          KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue)));
     }
 
     /**
@@ -394,46 +379,39 @@ public class SplittableParDoTest {
   }
 
   @Test
-  public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception {
-    // Tests that ProcessFn correctly propagates windows and timestamp of the element
+  public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception {
+    // Tests that ProcessFn correctly propagates the window and timestamp of the element
     // inside the KeyedWorkItem.
     // The underlying DoFn is actually monolithic, so this doesn't test splitting.
     DoFn<Integer, String> fn = new ToStringFn();
 
     Instant base = Instant.now();
 
-    IntervalWindow w1 =
+    IntervalWindow w =
         new IntervalWindow(
             base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));
-    IntervalWindow w2 =
-        new IntervalWindow(
-            base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2)));
-    IntervalWindow w3 =
-        new IntervalWindow(
-            base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
-
-    for (WindowExplosion explosion : WindowExplosion.values()) {
-      ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
-          new ProcessFnTester<>(
-              base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class),
-              MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
-      tester.startElement(
-          WindowedValue.of(
-              ElementAndRestriction.of(42, new SomeRestriction()),
-              base,
-              Arrays.asList(w1, w2, w3),
-              PaneInfo.ON_TIME_AND_ONLY_FIRING),
-          explosion);
-
-      for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
-        assertEquals(
-            Arrays.asList(
-                TimestampedValue.of("42a", base),
-                TimestampedValue.of("42b", base),
-                TimestampedValue.of("42c", base)),
-            tester.peekOutputElementsInWindow(w));
-      }
-    }
+
+    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+        new ProcessFnTester<>(
+            base,
+            fn,
+            BigEndianIntegerCoder.of(),
+            SerializableCoder.of(SomeRestriction.class),
+            MAX_OUTPUTS_PER_BUNDLE,
+            MAX_BUNDLE_DURATION);
+    tester.startElement(
+        WindowedValue.of(
+            ElementAndRestriction.of(42, new SomeRestriction()),
+            base,
+            Collections.singletonList(w),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    assertEquals(
+        Arrays.asList(
+            TimestampedValue.of("42a", base),
+            TimestampedValue.of("42b", base),
+            TimestampedValue.of("42c", base)),
+        tester.peekOutputElementsInWindow(w));
   }
 
   private static class WatermarkUpdateFn extends DoFn<Instant, String> {

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 95880f4..1e6452d 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -91,7 +91,8 @@
                     org.apache.beam.sdk.testing.UsesMapState,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
-                    org.apache.beam.sdk.testing.UsesTestStream
+                    org.apache.beam.sdk.testing.UsesTestStream,
+                    org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
new file mode 100644
index 0000000..2b1d673
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize splittable {@link ParDo} and use
+ * windowed side inputs.
+ */
+public interface UsesSplittableParDoWithWindowedSideInputs {}

http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 30329f4..a0f1fd3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
@@ -252,6 +253,46 @@ public class SplittableDoFnTest implements Serializable {
     p.run();
   }
 
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    UsesSplittableParDo.class,
+    UsesSplittableParDoWithWindowedSideInputs.class
+  })
+  public void testWindowedSideInput() throws Exception {
+    PCollection<Integer> mainInput =
+        p.apply("main",
+                Create.timestamped(
+                    TimestampedValue.of(0, new Instant(0)),
+                    TimestampedValue.of(1, new Instant(1)),
+                    TimestampedValue.of(2, new Instant(2)),
+                    TimestampedValue.of(3, new Instant(3)),
+                    TimestampedValue.of(4, new Instant(4)),
+                    TimestampedValue.of(5, new Instant(5)),
+                    TimestampedValue.of(6, new Instant(6)),
+                    TimestampedValue.of(7, new Instant(7))))
+            .apply("window 2", Window.<Integer>into(FixedWindows.of(Duration.millis(2))));
+
+    PCollectionView<String> sideInput =
+        p.apply("side",
+                Create.timestamped(
+                    TimestampedValue.of("a", new Instant(0)),
+                    TimestampedValue.of("b", new Instant(4))))
+            .apply("window 4", Window.<String>into(FixedWindows.of(Duration.millis(4))))
+            .apply("singleton", View.<String>asSingleton());
+
+    PCollection<String> res =
+        mainInput.apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput));
+
+    PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
+
+    p.run();
+
+    // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
+    // properly access side inputs.
+    // TODO: also test coverage when some of the windows of the side input are not ready.
+  }
+
   private static class SDFWithAdditionalOutput extends DoFn<Integer, String> {
     private final TupleTag<String> additionalOutput;