You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/04 17:52:34 UTC
[1/3] incubator-beam git commit: Move ReadyCheckingSideInputReader to
util
Repository: incubator-beam
Updated Branches:
refs/heads/master 32970c985 -> 97945648c
Move ReadyCheckingSideInputReader to util
This SideInputReader allows callers to check for a side input being
available before attempting to read the contents
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e8df24a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e8df24a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e8df24a
Branch: refs/heads/master
Commit: 3e8df24a2ced82be7ebe26837f96f651acc1ac06
Parents: b9116ac
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 2 10:03:43 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 2 10:25:25 2016 -0700
----------------------------------------------------------------------
.../direct/InProcessEvaluationContext.java | 11 +------
.../direct/InProcessSideInputContainer.java | 2 +-
.../direct/InProcessSideInputContainerTest.java | 2 +-
.../sdk/util/ReadyCheckingSideInputReader.java | 34 ++++++++++++++++++++
4 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index d9a7ff0..92e5aa5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
@@ -333,16 +334,6 @@ class InProcessEvaluationContext {
return sideInputContainer.createReaderForViews(sideInputs);
}
- /**
- * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
- * had its contents set in a window.
- */
- static interface ReadyCheckingSideInputReader extends SideInputReader {
- /**
- * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
- */
- boolean isReady(PCollectionView<?> view, BoundedWindow window);
- }
/**
* Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
index f4980ef..d0f29ff 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkArgument;
-import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PCollectionViewWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
index d8a78f2..8f89e70 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.doAnswer;
-import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e8df24a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java
new file mode 100644
index 0000000..cb38a55
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
+ * had its contents set in a window.
+ */
+public interface ReadyCheckingSideInputReader extends SideInputReader {
+ /**
+ * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
+ */
+ boolean isReady(PCollectionView<?> view, BoundedWindow window);
+}
+
[3/3] incubator-beam git commit: This closes #258
Posted by ke...@apache.org.
This closes #258
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/97945648
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/97945648
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/97945648
Branch: refs/heads/master
Commit: 97945648c8efbaaea6780ec1b6027c6b5be7527f
Parents: 32970c9 f57c1dc
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 4 10:52:19 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 4 10:52:19 2016 -0700
----------------------------------------------------------------------
.../direct/InProcessEvaluationContext.java | 11 +-
.../direct/InProcessSideInputContainer.java | 2 +-
.../direct/InProcessSideInputContainerTest.java | 2 +-
.../sdk/util/PushbackSideInputDoFnRunner.java | 115 +++++++++
.../sdk/util/ReadyCheckingSideInputReader.java | 34 +++
.../sdk/util/IdentitySideInputWindowFn.java | 54 +++++
.../util/PushbackSideInputDoFnRunnerTest.java | 234 +++++++++++++++++++
7 files changed, 440 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97945648/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97945648/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Add PushbackSideInputDoFnRunner
Posted by ke...@apache.org.
Add PushbackSideInputDoFnRunner
This DoFnRunner wraps a DoFnRunner and provides an additional method to
process an element in all the windows where all side inputs are ready,
returning any elements that it could not process.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f57c1dcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f57c1dcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f57c1dcf
Branch: refs/heads/master
Commit: f57c1dcfe24e80456d4d8c0422eeb2ee9617ca16
Parents: 3e8df24
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 2 10:04:20 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 4 10:51:52 2016 -0700
----------------------------------------------------------------------
.../sdk/util/PushbackSideInputDoFnRunner.java | 115 +++++++++
.../sdk/util/IdentitySideInputWindowFn.java | 54 +++++
.../util/PushbackSideInputDoFnRunnerTest.java | 234 +++++++++++++++++++
3 files changed, 403 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..4eeedf6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+ private final DoFnRunner<InputT, OutputT> underlying;
+ private final Collection<PCollectionView<?>> views;
+ private final ReadyCheckingSideInputReader sideInputReader;
+
+ private Set<BoundedWindow> notReadyWindows;
+
+ public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+ }
+
+ private PushbackSideInputDoFnRunner(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ this.underlying = underlying;
+ this.views = views;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @Override
+ public void startBundle() {
+ notReadyWindows = new HashSet<>();
+ underlying.startBundle();
+ }
+
+ /**
+ * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
+ * for each window the element is in that is ready.
+ *
+ * @param elem the element to process in all ready windows
+ * @return each element that could not be processed because it requires a side input window
+ * that is not ready.
+ */
+ public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+ if (views.isEmpty()) {
+ processElement(elem);
+ return Collections.emptyList();
+ }
+ ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+ for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+ BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+ boolean isReady = !notReadyWindows.contains(mainInputWindow);
+ for (PCollectionView<?> view : views) {
+ BoundedWindow sideInputWindow =
+ view.getWindowingStrategyInternal()
+ .getWindowFn()
+ .getSideInputWindow(mainInputWindow);
+ if (!sideInputReader.isReady(view, sideInputWindow)) {
+ isReady = false;
+ break;
+ }
+ }
+ if (isReady) {
+ processElement(windowElem);
+ } else {
+ notReadyWindows.add(mainInputWindow);
+ pushedBack.add(windowElem);
+ }
+ }
+ return pushedBack.build();
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ underlying.processElement(elem);
+ }
+
+ /**
+ * Call the underlying {@link DoFnRunner#finishBundle()}.
+ */
+ @Override
+ public void finishBundle() {
+ notReadyWindows = null;
+ underlying.finishBundle();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
new file mode 100644
index 0000000..ecab6f8
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import java.util.Collection;
+
+/**
+ * A {@link WindowFn} for use during tests that returns the input window for calls to
+ * {@link #getSideInputWindow(BoundedWindow)}.
+ */
+class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow> {
+ @Override
+ public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext c)
+ throws Exception {
+ return (Collection<BoundedWindow>) c.windows();
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return true;
+ }
+
+ @Override
+ public Coder<BoundedWindow> windowCoder() {
+ // not used
+ return (Coder) GlobalWindow.Coder.INSTANCE;
+ }
+
+ @Override
+ public BoundedWindow getSideInputWindow(BoundedWindow window) {
+ return window;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..8885118
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link PushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class PushbackSideInputDoFnRunnerTest {
+ @Mock private ReadyCheckingSideInputReader reader;
+ private TestDoFnRunner<Integer, Integer> underlying;
+ private PCollectionView<Integer> singletonView;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+ singletonView =
+ created
+ .apply(Window.into(new IdentitySideInputWindowFn()))
+ .apply(Sum.integersGlobally().asSingletonView());
+
+ underlying = new TestDoFnRunner<>();
+ }
+
+ private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+ ImmutableList<PCollectionView<?>> views) {
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ PushbackSideInputDoFnRunner.create(underlying, views, reader);
+ runner.startBundle();
+ return runner;
+ }
+
+ @Test
+ public void startFinishBundleDelegates() {
+ PushbackSideInputDoFnRunner runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ assertThat(underlying.started, is(true));
+ assertThat(underlying.finished, is(false));
+ runner.finishBundle();
+ assertThat(underlying.finished, is(true));
+ }
+
+ @Test
+ public void processElementSideInputNotReady() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> oneWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> oneWindowPushback =
+ runner.processElementInReadyWindows(oneWindow);
+ assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadyMultipleWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadySomeWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+ .thenReturn(false);
+ when(
+ reader.isReady(
+ Mockito.eq(singletonView),
+ org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+ .thenReturn(true);
+
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+ IntervalWindow bigWindow =
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+ PaneInfo.NO_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(
+ multiWindowPushback,
+ containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+ assertThat(underlying.inputElems,
+ containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
+ WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
+ }
+
+ @Test
+ public void processElementSideInputReadyAllWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(true);
+
+ ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+ PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ assertThat(underlying.inputElems,
+ containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+ }
+
+ @Test
+ public void processElementNoSideInputs() {
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of());
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+ }
+
+ private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+ List<WindowedValue<InputT>> inputElems;
+ private boolean started = false;
+ private boolean finished = false;
+
+ @Override
+ public void startBundle() {
+ started = true;
+ inputElems = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ inputElems.add(elem);
+ }
+
+ @Override
+ public void finishBundle() {
+ finished = true;
+ }
+ }
+}