You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/04 17:52:34 UTC

[1/3] incubator-beam git commit: Move ReadyCheckingSideInputReader to util

Repository: incubator-beam
Updated Branches:
  refs/heads/master 32970c985 -> 97945648c


Move ReadyCheckingSideInputReader to util

This SideInputReader allows callers to check for a side input being
available before attempting to read the contents


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e8df24a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e8df24a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e8df24a

Branch: refs/heads/master
Commit: 3e8df24a2ced82be7ebe26837f96f651acc1ac06
Parents: b9116ac
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 2 10:03:43 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 2 10:25:25 2016 -0700

----------------------------------------------------------------------
 .../direct/InProcessEvaluationContext.java      | 11 +------
 .../direct/InProcessSideInputContainer.java     |  2 +-
 .../direct/InProcessSideInputContainerTest.java |  2 +-
 .../sdk/util/ReadyCheckingSideInputReader.java  | 34 ++++++++++++++++++++
 4 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index d9a7ff0..92e5aa5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -333,16 +334,6 @@ class InProcessEvaluationContext {
     return sideInputContainer.createReaderForViews(sideInputs);
   }
 
-  /**
-   * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
-   * had its contents set in a window.
-   */
-  static interface ReadyCheckingSideInputReader extends SideInputReader {
-    /**
-     * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
-     */
-    boolean isReady(PCollectionView<?> view, BoundedWindow window);
-  }
 
   /**
    * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
index f4980ef..d0f29ff 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PCollectionViewWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
index d8a78f2..8f89e70 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.doAnswer;
 
-import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java
new file mode 100644
index 0000000..cb38a55
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
+ * had its contents set in a window.
+ */
+public interface ReadyCheckingSideInputReader extends SideInputReader {
+  /**
+   * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
+   */
+  boolean isReady(PCollectionView<?> view, BoundedWindow window);
+}
+


[3/3] incubator-beam git commit: This closes #258

Posted by ke...@apache.org.
This closes #258


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/97945648
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/97945648
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/97945648

Branch: refs/heads/master
Commit: 97945648c8efbaaea6780ec1b6027c6b5be7527f
Parents: 32970c9 f57c1dc
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 4 10:52:19 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 4 10:52:19 2016 -0700

----------------------------------------------------------------------
 .../direct/InProcessEvaluationContext.java      |  11 +-
 .../direct/InProcessSideInputContainer.java     |   2 +-
 .../direct/InProcessSideInputContainerTest.java |   2 +-
 .../sdk/util/PushbackSideInputDoFnRunner.java   | 115 +++++++++
 .../sdk/util/ReadyCheckingSideInputReader.java  |  34 +++
 .../sdk/util/IdentitySideInputWindowFn.java     |  54 +++++
 .../util/PushbackSideInputDoFnRunnerTest.java   | 234 +++++++++++++++++++
 7 files changed, 440 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97945648/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97945648/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------


[2/3] incubator-beam git commit: Add PushbackSideInputDoFnRunner

Posted by ke...@apache.org.
Add PushbackSideInputDoFnRunner

This DoFnRunner wraps a DoFnRunner and provides an additional method to
process an element in all the windows where all side inputs are ready,
returning any elements that it could not process.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f57c1dcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f57c1dcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f57c1dcf

Branch: refs/heads/master
Commit: f57c1dcfe24e80456d4d8c0422eeb2ee9617ca16
Parents: 3e8df24
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 2 10:04:20 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 4 10:51:52 2016 -0700

----------------------------------------------------------------------
 .../sdk/util/PushbackSideInputDoFnRunner.java   | 115 +++++++++
 .../sdk/util/IdentitySideInputWindowFn.java     |  54 +++++
 .../util/PushbackSideInputDoFnRunnerTest.java   | 234 +++++++++++++++++++
 3 files changed, 403 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..4eeedf6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+  private final DoFnRunner<InputT, OutputT> underlying;
+  private final Collection<PCollectionView<?>> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  private Set<BoundedWindow> notReadyWindows;
+
+  public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+  }
+
+  private PushbackSideInputDoFnRunner(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    this.underlying = underlying;
+    this.views = views;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+    notReadyWindows = new HashSet<>();
+    underlying.startBundle();
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
+   * for each window the element is in that is ready.
+   *
+   * @param elem the element to process in all ready windows
+   * @return each element that could not be processed because it requires a side input window
+   * that is not ready.
+   */
+  public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+    if (views.isEmpty()) {
+      processElement(elem);
+      return Collections.emptyList();
+    }
+    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+    for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+      BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+      boolean isReady = !notReadyWindows.contains(mainInputWindow);
+      for (PCollectionView<?> view : views) {
+        BoundedWindow sideInputWindow =
+            view.getWindowingStrategyInternal()
+                .getWindowFn()
+                .getSideInputWindow(mainInputWindow);
+        if (!sideInputReader.isReady(view, sideInputWindow)) {
+          isReady = false;
+          break;
+        }
+      }
+      if (isReady) {
+        processElement(windowElem);
+      } else {
+        notReadyWindows.add(mainInputWindow);
+        pushedBack.add(windowElem);
+      }
+    }
+    return pushedBack.build();
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    underlying.processElement(elem);
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#finishBundle()}.
+   */
+  @Override
+  public void finishBundle() {
+    notReadyWindows = null;
+    underlying.finishBundle();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
new file mode 100644
index 0000000..ecab6f8
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import java.util.Collection;
+
+/**
+ * A {@link WindowFn} for use during tests that returns the input window for calls to
+ * {@link #getSideInputWindow(BoundedWindow)}.
+ */
+class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow> {
+  @Override
+  public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext c)
+      throws Exception {
+    return (Collection<BoundedWindow>) c.windows();
+  }
+
+  @Override
+  public boolean isCompatible(WindowFn<?, ?> other) {
+    return true;
+  }
+
+  @Override
+  public Coder<BoundedWindow> windowCoder() {
+    // not used
+    return (Coder) GlobalWindow.Coder.INSTANCE;
+  }
+
+  @Override
+  public BoundedWindow getSideInputWindow(BoundedWindow window) {
+    return window;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..8885118
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link PushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class PushbackSideInputDoFnRunnerTest {
+  @Mock private ReadyCheckingSideInputReader reader;
+  private TestDoFnRunner<Integer, Integer> underlying;
+  private PCollectionView<Integer> singletonView;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    singletonView =
+        created
+            .apply(Window.into(new IdentitySideInputWindowFn()))
+            .apply(Sum.integersGlobally().asSingletonView());
+
+    underlying = new TestDoFnRunner<>();
+  }
+
+  private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+      ImmutableList<PCollectionView<?>> views) {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        PushbackSideInputDoFnRunner.create(underlying, views, reader);
+    runner.startBundle();
+    return runner;
+  }
+
+  @Test
+  public void startFinishBundleDelegates() {
+    PushbackSideInputDoFnRunner runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    assertThat(underlying.started, is(true));
+    assertThat(underlying.finished, is(false));
+    runner.finishBundle();
+    assertThat(underlying.finished, is(true));
+  }
+
+  @Test
+  public void processElementSideInputNotReady() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> oneWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            new IntervalWindow(new Instant(-500L), new Instant(0L)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> oneWindowPushback =
+        runner.processElementInReadyWindows(oneWindow);
+    assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadyMultipleWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadySomeWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+        .thenReturn(false);
+    when(
+            reader.isReady(
+                Mockito.eq(singletonView),
+                org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+        .thenReturn(true);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+    IntervalWindow bigWindow =
+        new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(
+        multiWindowPushback,
+        containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+    assertThat(underlying.inputElems,
+        containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
+            WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
+  }
+
+  @Test
+  public void processElementSideInputReadyAllWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(true);
+
+    ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+    PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    assertThat(underlying.inputElems,
+        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+  }
+
+  @Test
+  public void processElementNoSideInputs() {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of());
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+  }
+
+  private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+    List<WindowedValue<InputT>> inputElems;
+    private boolean started = false;
+    private boolean finished = false;
+
+    @Override
+    public void startBundle() {
+      started = true;
+      inputElems = new ArrayList<>();
+    }
+
+    @Override
+    public void processElement(WindowedValue<InputT> elem) {
+      inputElems.add(elem);
+    }
+
+    @Override
+    public void finishBundle() {
+      finished = true;
+    }
+  }
+}