You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:40 UTC
[06/50] [abbrv] beam git commit: Extracts interface from
PushbackSideInputDoFnRunner
Extracts interface from PushbackSideInputDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e1a2675
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e1a2675
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e1a2675
Branch: refs/heads/DSL_SQL
Commit: 7e1a2675699ef14291e8c112010be66fff4b8581
Parents: 1cc16b0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 14:41:53 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 3 +-
.../core/PushbackSideInputDoFnRunner.java | 106 +------
.../core/SimplePushbackSideInputDoFnRunner.java | 115 ++++++++
.../core/PushbackSideInputDoFnRunnerTest.java | 282 -------------------
.../SimplePushbackSideInputDoFnRunnerTest.java | 282 +++++++++++++++++++
.../beam/runners/direct/ParDoEvaluator.java | 3 +-
.../wrappers/streaming/DoFnOperator.java | 12 +-
.../streaming/SplittableDoFnOperator.java | 2 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
9 files changed, 424 insertions(+), 383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index bad5be2..52d1d43 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -48,6 +48,7 @@ import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StatefulDoFnRunner;
@@ -368,7 +369,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
pushbackDoFnRunner =
- PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+ SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 4ad20b5..bab1dc7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -17,113 +17,35 @@
*/
package org.apache.beam.runners.core;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
/**
- * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
- * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ * Interface for runners of {@link DoFn}'s that support pushback when reading side inputs,
+ * i.e. return elements that could not be processed because they require reading a side input
+ * window that is not ready.
*/
-public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- private final DoFnRunner<InputT, OutputT> underlying;
- private final Collection<PCollectionView<?>> views;
- private final ReadyCheckingSideInputReader sideInputReader;
-
- private Set<BoundedWindow> notReadyWindows;
-
- public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
- DoFnRunner<InputT, OutputT> underlying,
- Collection<PCollectionView<?>> views,
- ReadyCheckingSideInputReader sideInputReader) {
- return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
- }
-
- private PushbackSideInputDoFnRunner(
- DoFnRunner<InputT, OutputT> underlying,
- Collection<PCollectionView<?>> views,
- ReadyCheckingSideInputReader sideInputReader) {
- this.underlying = underlying;
- this.views = views;
- this.sideInputReader = sideInputReader;
- }
-
- @Override
- public void startBundle() {
- notReadyWindows = new HashSet<>();
- underlying.startBundle();
- }
+public interface PushbackSideInputDoFnRunner<InputT, OutputT> {
+ /** Calls the underlying {@link DoFn.StartBundle} method. */
+ void startBundle();
/**
- * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
+ * Call the underlying {@link DoFn.ProcessElement} method for the provided element
* for each window the element is in that is ready.
*
* @param elem the element to process in all ready windows
* @return each element that could not be processed because it requires a side input window
* that is not ready.
*/
- public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
- if (views.isEmpty()) {
- // When there are no side inputs, we can preserve the compressed representation.
- processElement(elem);
- return Collections.emptyList();
- }
- ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
- for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
- BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
- if (isReady(mainInputWindow)) {
- // When there are any side inputs, we have to process the element in each window
- // individually, to disambiguate access to per-window side inputs.
- processElement(windowElem);
- } else {
- notReadyWindows.add(mainInputWindow);
- pushedBack.add(windowElem);
- }
- }
- return pushedBack.build();
- }
-
- private boolean isReady(BoundedWindow mainInputWindow) {
- if (notReadyWindows.contains(mainInputWindow)) {
- return false;
- }
- for (PCollectionView<?> view : views) {
- BoundedWindow sideInputWindow =
- view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
- if (!sideInputReader.isReady(view, sideInputWindow)) {
- return false;
- }
- }
- return true;
- }
+ Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem);
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- underlying.processElement(elem);
- }
+ /** Calls the underlying {@link DoFn.OnTimer} method. */
+ void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain);
- @Override
- public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
- TimeDomain timeDomain) {
- underlying.onTimer(timerId, window, timestamp, timeDomain);
- }
-
- /**
- * Call the underlying {@link DoFnRunner#finishBundle()}.
- */
- @Override
- public void finishBundle() {
- notReadyWindows = null;
- underlying.finishBundle();
- }
+ /** Calls the underlying {@link DoFn.FinishBundle} method. */
+ void finishBundle();
}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..50d301b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
+ implements PushbackSideInputDoFnRunner<InputT, OutputT> {
+ private final DoFnRunner<InputT, OutputT> underlying;
+ private final Collection<PCollectionView<?>> views;
+ private final ReadyCheckingSideInputReader sideInputReader;
+
+ private Set<BoundedWindow> notReadyWindows;
+
+ public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ return new SimplePushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+ }
+
+ private SimplePushbackSideInputDoFnRunner(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ this.underlying = underlying;
+ this.views = views;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @Override
+ public void startBundle() {
+ notReadyWindows = new HashSet<>();
+ underlying.startBundle();
+ }
+
+ @Override
+ public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+ if (views.isEmpty()) {
+ // When there are no side inputs, we can preserve the compressed representation.
+ underlying.processElement(elem);
+ return Collections.emptyList();
+ }
+ ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+ for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+ BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+ if (isReady(mainInputWindow)) {
+ // When there are any side inputs, we have to process the element in each window
+ // individually, to disambiguate access to per-window side inputs.
+ underlying.processElement(windowElem);
+ } else {
+ notReadyWindows.add(mainInputWindow);
+ pushedBack.add(windowElem);
+ }
+ }
+ return pushedBack.build();
+ }
+
+ private boolean isReady(BoundedWindow mainInputWindow) {
+ if (notReadyWindows.contains(mainInputWindow)) {
+ return false;
+ }
+ for (PCollectionView<?> view : views) {
+ BoundedWindow sideInputWindow =
+ view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+ if (!sideInputReader.isReady(view, sideInputWindow)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ underlying.onTimer(timerId, window, timestamp, timeDomain);
+ }
+
+ @Override
+ public void finishBundle() {
+ notReadyWindows = null;
+ underlying.finishBundle();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
deleted file mode 100644
index cb057b8..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link PushbackSideInputDoFnRunner}.
- */
-@RunWith(JUnit4.class)
-public class PushbackSideInputDoFnRunnerTest {
- @Mock private ReadyCheckingSideInputReader reader;
- private TestDoFnRunner<Integer, Integer> underlying;
- private PCollectionView<Integer> singletonView;
-
- @Rule
- public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
- singletonView =
- created
- .apply(Window.into(new IdentitySideInputWindowFn()))
- .apply(Sum.integersGlobally().asSingletonView());
-
- underlying = new TestDoFnRunner<>();
- }
-
- private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
- ImmutableList<PCollectionView<?>> views) {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- PushbackSideInputDoFnRunner.create(underlying, views, reader);
- runner.startBundle();
- return runner;
- }
-
- @Test
- public void startFinishBundleDelegates() {
- PushbackSideInputDoFnRunner runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- assertThat(underlying.started, is(true));
- assertThat(underlying.finished, is(false));
- runner.finishBundle();
- assertThat(underlying.finished, is(true));
- }
-
- @Test
- public void processElementSideInputNotReady() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(false);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- WindowedValue<Integer> oneWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> oneWindowPushback =
- runner.processElementInReadyWindows(oneWindow);
- assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
- assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
- }
-
- @Test
- public void processElementSideInputNotReadyMultipleWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(false);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
- assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
- }
-
- @Test
- public void processElementSideInputNotReadySomeWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
- .thenReturn(false);
- when(
- reader.isReady(
- Mockito.eq(singletonView),
- org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
- .thenReturn(true);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
- IntervalWindow bigWindow =
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
- PaneInfo.NO_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(
- multiWindowPushback,
- containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
- assertThat(
- underlying.inputElems,
- containsInAnyOrder(
- WindowedValue.of(
- 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
- WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
- }
-
- @Test
- public void processElementSideInputReadyAllWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(true);
-
- ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
- PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, emptyIterable());
- assertThat(
- underlying.inputElems,
- containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
- }
-
- @Test
- public void processElementNoSideInputs() {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of());
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, emptyIterable());
- // Should preserve the compressed representation when there's no side inputs.
- assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
- }
-
- /** Tests that a call to onTimer gets delegated. */
- @Test
- public void testOnTimerCalled() {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of());
-
- String timerId = "fooTimer";
- IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
- Instant timestamp = new Instant(72);
-
- // Mocking is not easily compatible with annotation analysis, so we manually record
- // the method call.
- runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
-
- assertThat(
- underlying.firedTimers,
- contains(
- TimerData.of(
- timerId,
- StateNamespaces.window(IntervalWindow.getCoder(), window),
- timestamp,
- TimeDomain.EVENT_TIME)));
- }
-
- private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- List<WindowedValue<InputT>> inputElems;
- List<TimerData> firedTimers;
- private boolean started = false;
- private boolean finished = false;
-
- @Override
- public void startBundle() {
- started = true;
- inputElems = new ArrayList<>();
- firedTimers = new ArrayList<>();
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- inputElems.add(elem);
- }
-
- @Override
- public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
- TimeDomain timeDomain) {
- firedTimers.add(
- TimerData.of(
- timerId,
- StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
- timestamp,
- timeDomain));
- }
-
- @Override
- public void finishBundle() {
- finished = true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..ba3f926
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link SimplePushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class SimplePushbackSideInputDoFnRunnerTest {
+ @Mock private ReadyCheckingSideInputReader reader;
+ private TestDoFnRunner<Integer, Integer> underlying;
+ private PCollectionView<Integer> singletonView;
+
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+ singletonView =
+ created
+ .apply(Window.into(new IdentitySideInputWindowFn()))
+ .apply(Sum.integersGlobally().asSingletonView());
+
+ underlying = new TestDoFnRunner<>();
+ }
+
+ private SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+ ImmutableList<PCollectionView<?>> views) {
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ SimplePushbackSideInputDoFnRunner.create(underlying, views, reader);
+ runner.startBundle();
+ return runner;
+ }
+
+ @Test
+ public void startFinishBundleDelegates() {
+ PushbackSideInputDoFnRunner runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ assertThat(underlying.started, is(true));
+ assertThat(underlying.finished, is(false));
+ runner.finishBundle();
+ assertThat(underlying.finished, is(true));
+ }
+
+ @Test
+ public void processElementSideInputNotReady() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> oneWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> oneWindowPushback =
+ runner.processElementInReadyWindows(oneWindow);
+ assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadyMultipleWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadySomeWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+ .thenReturn(false);
+ when(
+ reader.isReady(
+ Mockito.eq(singletonView),
+ org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+ .thenReturn(true);
+
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+ IntervalWindow bigWindow =
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+ PaneInfo.NO_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(
+ multiWindowPushback,
+ containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+ assertThat(
+ underlying.inputElems,
+ containsInAnyOrder(
+ WindowedValue.of(
+ 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
+ WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
+ }
+
+ @Test
+ public void processElementSideInputReadyAllWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(true);
+
+ ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ assertThat(
+ underlying.inputElems,
+ containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+ }
+
+ @Test
+ public void processElementNoSideInputs() {
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of());
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ // Should preserve the compressed representation when there's no side inputs.
+ assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+ }
+
+ /** Tests that a call to onTimer gets delegated. */
+ @Test
+ public void testOnTimerCalled() {
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of());
+
+ String timerId = "fooTimer";
+ IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
+ Instant timestamp = new Instant(72);
+
+ // Mocking is not easily compatible with annotation analysis, so we manually record
+ // the method call.
+ runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
+
+ assertThat(
+ underlying.firedTimers,
+ contains(
+ TimerData.of(
+ timerId,
+ StateNamespaces.window(IntervalWindow.getCoder(), window),
+ timestamp,
+ TimeDomain.EVENT_TIME)));
+ }
+
+ private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+ List<WindowedValue<InputT>> inputElems;
+ List<TimerData> firedTimers;
+ private boolean started = false;
+ private boolean finished = false;
+
+ @Override
+ public void startBundle() {
+ started = true;
+ inputElems = new ArrayList<>();
+ firedTimers = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ inputElems.add(elem);
+ }
+
+ @Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ firedTimers.add(
+ TimerData.of(
+ timerId,
+ StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
+ timestamp,
+ timeDomain));
+ }
+
+ @Override
+ public void finishBundle() {
+ finished = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 131716f..bab7b2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -85,7 +86,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
aggregatorChanges,
windowingStrategy);
PushbackSideInputDoFnRunner<InputT, OutputT> runner =
- PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+ SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
try {
runner.startBundle();
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 5496f71..8a09286 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
@@ -119,6 +120,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected final OutputManagerFactory<OutputT> outputManagerFactory;
+ protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
protected transient SideInputHandler sideInputHandler;
@@ -269,7 +271,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
ExecutionContext.StepContext stepContext = createStepContext();
- DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
+ doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(),
doFn,
sideInputReader,
@@ -320,7 +322,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
pushbackDoFnRunner =
- PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+ SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
}
@Override
@@ -362,9 +364,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public final void processElement(
StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
- pushbackDoFnRunner.startBundle();
- pushbackDoFnRunner.processElement(streamRecord.getValue());
- pushbackDoFnRunner.finishBundle();
+ doFnRunner.startBundle();
+ doFnRunner.processElement(streamRecord.getValue());
+ doFnRunner.finishBundle();
}
private void setPushedBackWatermark(long watermark) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 1a636c9..40f70e4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -142,7 +142,7 @@ public class SplittableDoFnOperator<
@Override
public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
- pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
(String) stateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 7b899f4..9b2136c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -108,7 +108,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
@Override
public void fireTimer(InternalTimer<?, TimerData> timer) {
- pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<K, InputT>timersWorkItem(
(K) stateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));