You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 01:12:25 UTC
[1/7] beam git commit: Extracts interface from
PushbackSideInputDoFnRunner
Repository: beam
Updated Branches:
refs/heads/master a9bcc8b15 -> e0df7d85e
Extracts interface from PushbackSideInputDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e1a2675
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e1a2675
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e1a2675
Branch: refs/heads/master
Commit: 7e1a2675699ef14291e8c112010be66fff4b8581
Parents: 1cc16b0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 14:41:53 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 3 +-
.../core/PushbackSideInputDoFnRunner.java | 106 +------
.../core/SimplePushbackSideInputDoFnRunner.java | 115 ++++++++
.../core/PushbackSideInputDoFnRunnerTest.java | 282 -------------------
.../SimplePushbackSideInputDoFnRunnerTest.java | 282 +++++++++++++++++++
.../beam/runners/direct/ParDoEvaluator.java | 3 +-
.../wrappers/streaming/DoFnOperator.java | 12 +-
.../streaming/SplittableDoFnOperator.java | 2 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
9 files changed, 424 insertions(+), 383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index bad5be2..52d1d43 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -48,6 +48,7 @@ import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StatefulDoFnRunner;
@@ -368,7 +369,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
pushbackDoFnRunner =
- PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+ SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 4ad20b5..bab1dc7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -17,113 +17,35 @@
*/
package org.apache.beam.runners.core;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
/**
- * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
- * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ * Interface for runners of {@link DoFn}'s that support pushback when reading side inputs,
+ * i.e. return elements that could not be processed because they require reading a side input
+ * window that is not ready.
*/
-public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- private final DoFnRunner<InputT, OutputT> underlying;
- private final Collection<PCollectionView<?>> views;
- private final ReadyCheckingSideInputReader sideInputReader;
-
- private Set<BoundedWindow> notReadyWindows;
-
- public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
- DoFnRunner<InputT, OutputT> underlying,
- Collection<PCollectionView<?>> views,
- ReadyCheckingSideInputReader sideInputReader) {
- return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
- }
-
- private PushbackSideInputDoFnRunner(
- DoFnRunner<InputT, OutputT> underlying,
- Collection<PCollectionView<?>> views,
- ReadyCheckingSideInputReader sideInputReader) {
- this.underlying = underlying;
- this.views = views;
- this.sideInputReader = sideInputReader;
- }
-
- @Override
- public void startBundle() {
- notReadyWindows = new HashSet<>();
- underlying.startBundle();
- }
+public interface PushbackSideInputDoFnRunner<InputT, OutputT> {
+ /** Calls the underlying {@link DoFn.StartBundle} method. */
+ void startBundle();
/**
- * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
+ * Call the underlying {@link DoFn.ProcessElement} method for the provided element
* for each window the element is in that is ready.
*
* @param elem the element to process in all ready windows
* @return each element that could not be processed because it requires a side input window
* that is not ready.
*/
- public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
- if (views.isEmpty()) {
- // When there are no side inputs, we can preserve the compressed representation.
- processElement(elem);
- return Collections.emptyList();
- }
- ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
- for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
- BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
- if (isReady(mainInputWindow)) {
- // When there are any side inputs, we have to process the element in each window
- // individually, to disambiguate access to per-window side inputs.
- processElement(windowElem);
- } else {
- notReadyWindows.add(mainInputWindow);
- pushedBack.add(windowElem);
- }
- }
- return pushedBack.build();
- }
-
- private boolean isReady(BoundedWindow mainInputWindow) {
- if (notReadyWindows.contains(mainInputWindow)) {
- return false;
- }
- for (PCollectionView<?> view : views) {
- BoundedWindow sideInputWindow =
- view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
- if (!sideInputReader.isReady(view, sideInputWindow)) {
- return false;
- }
- }
- return true;
- }
+ Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem);
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- underlying.processElement(elem);
- }
+ /** Calls the underlying {@link DoFn.OnTimer} method. */
+ void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain);
- @Override
- public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
- TimeDomain timeDomain) {
- underlying.onTimer(timerId, window, timestamp, timeDomain);
- }
-
- /**
- * Call the underlying {@link DoFnRunner#finishBundle()}.
- */
- @Override
- public void finishBundle() {
- notReadyWindows = null;
- underlying.finishBundle();
- }
+ /** Calls the underlying {@link DoFn.FinishBundle} method. */
+ void finishBundle();
}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..50d301b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
+ implements PushbackSideInputDoFnRunner<InputT, OutputT> {
+ private final DoFnRunner<InputT, OutputT> underlying;
+ private final Collection<PCollectionView<?>> views;
+ private final ReadyCheckingSideInputReader sideInputReader;
+
+ private Set<BoundedWindow> notReadyWindows;
+
+ public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ return new SimplePushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+ }
+
+ private SimplePushbackSideInputDoFnRunner(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ this.underlying = underlying;
+ this.views = views;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @Override
+ public void startBundle() {
+ notReadyWindows = new HashSet<>();
+ underlying.startBundle();
+ }
+
+ @Override
+ public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+ if (views.isEmpty()) {
+ // When there are no side inputs, we can preserve the compressed representation.
+ underlying.processElement(elem);
+ return Collections.emptyList();
+ }
+ ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+ for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+ BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+ if (isReady(mainInputWindow)) {
+ // When there are any side inputs, we have to process the element in each window
+ // individually, to disambiguate access to per-window side inputs.
+ underlying.processElement(windowElem);
+ } else {
+ notReadyWindows.add(mainInputWindow);
+ pushedBack.add(windowElem);
+ }
+ }
+ return pushedBack.build();
+ }
+
+ private boolean isReady(BoundedWindow mainInputWindow) {
+ if (notReadyWindows.contains(mainInputWindow)) {
+ return false;
+ }
+ for (PCollectionView<?> view : views) {
+ BoundedWindow sideInputWindow =
+ view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+ if (!sideInputReader.isReady(view, sideInputWindow)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ underlying.onTimer(timerId, window, timestamp, timeDomain);
+ }
+
+ @Override
+ public void finishBundle() {
+ notReadyWindows = null;
+ underlying.finishBundle();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
deleted file mode 100644
index cb057b8..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link PushbackSideInputDoFnRunner}.
- */
-@RunWith(JUnit4.class)
-public class PushbackSideInputDoFnRunnerTest {
- @Mock private ReadyCheckingSideInputReader reader;
- private TestDoFnRunner<Integer, Integer> underlying;
- private PCollectionView<Integer> singletonView;
-
- @Rule
- public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
- singletonView =
- created
- .apply(Window.into(new IdentitySideInputWindowFn()))
- .apply(Sum.integersGlobally().asSingletonView());
-
- underlying = new TestDoFnRunner<>();
- }
-
- private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
- ImmutableList<PCollectionView<?>> views) {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- PushbackSideInputDoFnRunner.create(underlying, views, reader);
- runner.startBundle();
- return runner;
- }
-
- @Test
- public void startFinishBundleDelegates() {
- PushbackSideInputDoFnRunner runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- assertThat(underlying.started, is(true));
- assertThat(underlying.finished, is(false));
- runner.finishBundle();
- assertThat(underlying.finished, is(true));
- }
-
- @Test
- public void processElementSideInputNotReady() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(false);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- WindowedValue<Integer> oneWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> oneWindowPushback =
- runner.processElementInReadyWindows(oneWindow);
- assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
- assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
- }
-
- @Test
- public void processElementSideInputNotReadyMultipleWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(false);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
- assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
- }
-
- @Test
- public void processElementSideInputNotReadySomeWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
- .thenReturn(false);
- when(
- reader.isReady(
- Mockito.eq(singletonView),
- org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
- .thenReturn(true);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
- IntervalWindow bigWindow =
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
- PaneInfo.NO_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(
- multiWindowPushback,
- containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
- assertThat(
- underlying.inputElems,
- containsInAnyOrder(
- WindowedValue.of(
- 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
- WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
- }
-
- @Test
- public void processElementSideInputReadyAllWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(true);
-
- ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
- PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, emptyIterable());
- assertThat(
- underlying.inputElems,
- containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
- }
-
- @Test
- public void processElementNoSideInputs() {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of());
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, emptyIterable());
- // Should preserve the compressed representation when there's no side inputs.
- assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
- }
-
- /** Tests that a call to onTimer gets delegated. */
- @Test
- public void testOnTimerCalled() {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of());
-
- String timerId = "fooTimer";
- IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
- Instant timestamp = new Instant(72);
-
- // Mocking is not easily compatible with annotation analysis, so we manually record
- // the method call.
- runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
-
- assertThat(
- underlying.firedTimers,
- contains(
- TimerData.of(
- timerId,
- StateNamespaces.window(IntervalWindow.getCoder(), window),
- timestamp,
- TimeDomain.EVENT_TIME)));
- }
-
- private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- List<WindowedValue<InputT>> inputElems;
- List<TimerData> firedTimers;
- private boolean started = false;
- private boolean finished = false;
-
- @Override
- public void startBundle() {
- started = true;
- inputElems = new ArrayList<>();
- firedTimers = new ArrayList<>();
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- inputElems.add(elem);
- }
-
- @Override
- public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
- TimeDomain timeDomain) {
- firedTimers.add(
- TimerData.of(
- timerId,
- StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
- timestamp,
- timeDomain));
- }
-
- @Override
- public void finishBundle() {
- finished = true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..ba3f926
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link SimplePushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class SimplePushbackSideInputDoFnRunnerTest {
+ @Mock private ReadyCheckingSideInputReader reader;
+ private TestDoFnRunner<Integer, Integer> underlying;
+ private PCollectionView<Integer> singletonView;
+
+ @Rule
+ public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+ singletonView =
+ created
+ .apply(Window.into(new IdentitySideInputWindowFn()))
+ .apply(Sum.integersGlobally().asSingletonView());
+
+ underlying = new TestDoFnRunner<>();
+ }
+
+ private SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+ ImmutableList<PCollectionView<?>> views) {
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ SimplePushbackSideInputDoFnRunner.create(underlying, views, reader);
+ runner.startBundle();
+ return runner;
+ }
+
+ @Test
+ public void startFinishBundleDelegates() {
+ PushbackSideInputDoFnRunner runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ assertThat(underlying.started, is(true));
+ assertThat(underlying.finished, is(false));
+ runner.finishBundle();
+ assertThat(underlying.finished, is(true));
+ }
+
+ @Test
+ public void processElementSideInputNotReady() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> oneWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> oneWindowPushback =
+ runner.processElementInReadyWindows(oneWindow);
+ assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadyMultipleWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadySomeWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+ .thenReturn(false);
+ when(
+ reader.isReady(
+ Mockito.eq(singletonView),
+ org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+ .thenReturn(true);
+
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+ IntervalWindow bigWindow =
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+ PaneInfo.NO_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(
+ multiWindowPushback,
+ containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+ assertThat(
+ underlying.inputElems,
+ containsInAnyOrder(
+ WindowedValue.of(
+ 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
+ WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
+ }
+
+ @Test
+ public void processElementSideInputReadyAllWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(true);
+
+ ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ assertThat(
+ underlying.inputElems,
+ containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+ }
+
+ @Test
+ public void processElementNoSideInputs() {
+ SimplePushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of());
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ // Should preserve the compressed representation when there's no side inputs.
+ assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+ }
+
+ /** Tests that a call to onTimer gets delegated. */
+ @Test
+ public void testOnTimerCalled() {
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of());
+
+ String timerId = "fooTimer";
+ IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16));
+ Instant timestamp = new Instant(72);
+
+ // Mocking is not easily compatible with annotation analysis, so we manually record
+ // the method call.
+ runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
+
+ assertThat(
+ underlying.firedTimers,
+ contains(
+ TimerData.of(
+ timerId,
+ StateNamespaces.window(IntervalWindow.getCoder(), window),
+ timestamp,
+ TimeDomain.EVENT_TIME)));
+ }
+
+ private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+ List<WindowedValue<InputT>> inputElems;
+ List<TimerData> firedTimers;
+ private boolean started = false;
+ private boolean finished = false;
+
+ @Override
+ public void startBundle() {
+ started = true;
+ inputElems = new ArrayList<>();
+ firedTimers = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ inputElems.add(elem);
+ }
+
+ @Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp,
+ TimeDomain timeDomain) {
+ firedTimers.add(
+ TimerData.of(
+ timerId,
+ StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
+ timestamp,
+ timeDomain));
+ }
+
+ @Override
+ public void finishBundle() {
+ finished = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 131716f..bab7b2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -85,7 +86,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
aggregatorChanges,
windowingStrategy);
PushbackSideInputDoFnRunner<InputT, OutputT> runner =
- PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+ SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
try {
runner.startBundle();
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 5496f71..8a09286 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
@@ -119,6 +120,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected final OutputManagerFactory<OutputT> outputManagerFactory;
+ protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
protected transient SideInputHandler sideInputHandler;
@@ -269,7 +271,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
ExecutionContext.StepContext stepContext = createStepContext();
- DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
+ doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(),
doFn,
sideInputReader,
@@ -320,7 +322,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
pushbackDoFnRunner =
- PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+ SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
}
@Override
@@ -362,9 +364,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public final void processElement(
StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
- pushbackDoFnRunner.startBundle();
- pushbackDoFnRunner.processElement(streamRecord.getValue());
- pushbackDoFnRunner.finishBundle();
+ doFnRunner.startBundle();
+ doFnRunner.processElement(streamRecord.getValue());
+ doFnRunner.finishBundle();
}
private void setPushedBackWatermark(long watermark) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 1a636c9..40f70e4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -142,7 +142,7 @@ public class SplittableDoFnOperator<
@Override
public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
- pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
(String) stateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 7b899f4..9b2136c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -108,7 +108,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
@Override
public void fireTimer(InternalTimer<?, TimerData> timer) {
- pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<K, InputT>timersWorkItem(
(K) stateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
[7/7] beam git commit: This closes #2556
Posted by jk...@apache.org.
This closes #2556
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0df7d85
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0df7d85
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0df7d85
Branch: refs/heads/master
Commit: e0df7d85e80eac71f875663512bc293a0529460f
Parents: a9bcc8b 6ac3ac5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Apr 18 18:02:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:25 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 3 +-
.../apache/beam/runners/core/DoFnRunners.java | 32 +++
.../beam/runners/core/ProcessFnRunner.java | 127 +++++++++
.../core/PushbackSideInputDoFnRunner.java | 106 +------
.../core/SimplePushbackSideInputDoFnRunner.java | 115 ++++++++
.../beam/runners/core/SplittableParDo.java | 110 +++++---
.../core/PushbackSideInputDoFnRunnerTest.java | 282 -------------------
.../SimplePushbackSideInputDoFnRunnerTest.java | 282 +++++++++++++++++++
.../beam/runners/core/SplittableParDoTest.java | 90 +++---
...ecycleManagerRemovingTransformEvaluator.java | 6 +-
.../beam/runners/direct/ParDoEvaluator.java | 127 ++++++---
.../runners/direct/ParDoEvaluatorFactory.java | 13 +-
...littableProcessElementsEvaluatorFactory.java | 106 +++++--
.../direct/StatefulParDoEvaluatorFactory.java | 4 +-
.../direct/TransformEvaluatorRegistry.java | 4 +-
...leManagerRemovingTransformEvaluatorTest.java | 8 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 7 +-
runners/flink/runner/pom.xml | 3 +-
.../wrappers/streaming/DoFnOperator.java | 12 +-
.../streaming/SplittableDoFnOperator.java | 2 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
...esSplittableParDoWithWindowedSideInputs.java | 26 ++
.../beam/sdk/transforms/SplittableDoFnTest.java | 104 +++++--
23 files changed, 993 insertions(+), 578 deletions(-)
----------------------------------------------------------------------
[3/7] beam git commit: Separates side input test and side output test
Posted by jk...@apache.org.
Separates side input test and side output test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a51bdd26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a51bdd26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a51bdd26
Branch: refs/heads/master
Commit: a51bdd266f9c877cb407de986a465fc9c7de76ff
Parents: a9bcc8b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 15 16:38:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/SplittableDoFnTest.java | 63 ++++++++++++++------
1 file changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a51bdd26/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 9e8c12e..30329f4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -61,7 +62,7 @@ import org.junit.runners.JUnit4;
* Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior.
*/
@RunWith(JUnit4.class)
-public class SplittableDoFnTest {
+public class SplittableDoFnTest implements Serializable {
static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
@ProcessElement
@@ -216,22 +217,18 @@ public class SplittableDoFnTest {
p.run();
}
- private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
+ private static class SDFWithSideInput extends DoFn<Integer, String> {
private final PCollectionView<String> sideInput;
- private final TupleTag<String> additionalOutput;
- private SDFWithSideInputsAndOutputs(
- PCollectionView<String> sideInput, TupleTag<String> additionalOutput) {
+ private SDFWithSideInput(PCollectionView<String> sideInput) {
this.sideInput = sideInput;
- this.additionalOutput = additionalOutput;
}
@ProcessElement
public void process(ProcessContext c, OffsetRangeTracker tracker) {
checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
String side = c.sideInput(sideInput);
- c.output("main:" + side + ":" + c.element());
- c.output(additionalOutput, "additional:" + side + ":" + c.element());
+ c.output(side + ":" + c.element());
}
@GetInitialRestriction
@@ -242,27 +239,55 @@ public class SplittableDoFnTest {
@Test
@Category({ValidatesRunner.class, UsesSplittableParDo.class})
- public void testSideInputsAndOutputs() throws Exception {
-
+ public void testSideInput() throws Exception {
PCollectionView<String> sideInput =
p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
- TupleTag<String> mainOutputTag = new TupleTag<>("main");
- TupleTag<String> additionalOutputTag = new TupleTag<>("additional");
+
+ PCollection<String> res =
+ p.apply("input", Create.of(0, 1, 2))
+ .apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput));
+
+ PAssert.that(res).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2"));
+
+ p.run();
+ }
+
+ private static class SDFWithAdditionalOutput extends DoFn<Integer, String> {
+ private final TupleTag<String> additionalOutput;
+
+ private SDFWithAdditionalOutput(TupleTag<String> additionalOutput) {
+ this.additionalOutput = additionalOutput;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
+ c.output("main:" + c.element());
+ c.output(additionalOutput, "additional:" + c.element());
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(Integer value) {
+ return new OffsetRange(0, 1);
+ }
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesSplittableParDo.class})
+ public void testAdditionalOutput() throws Exception {
+ TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
+ TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {};
PCollectionTuple res =
p.apply("input", Create.of(0, 1, 2))
.apply(
- ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag))
- .withSideInputs(sideInput)
+ ParDo.of(new SDFWithAdditionalOutput(additionalOutputTag))
.withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
- res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
- res.get(additionalOutputTag).setCoder(StringUtf8Coder.of());
PAssert.that(res.get(mainOutputTag))
- .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
+ .containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
PAssert.that(res.get(additionalOutputTag))
- .containsInAnyOrder(
- Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2"));
+ .containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));
p.run();
}
[5/7] beam git commit: Creates ProcessFnRunner and wires it through
ParDoEvaluator
Posted by jk...@apache.org.
Creates ProcessFnRunner and wires it through ParDoEvaluator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b93de58f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b93de58f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b93de58f
Branch: refs/heads/master
Commit: b93de58f5a3a10877997815a793725cb0e53cc2d
Parents: 7e1a267
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 14:52:23 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunners.java | 32 +++++
.../beam/runners/core/ProcessFnRunner.java | 127 +++++++++++++++++++
.../beam/runners/direct/ParDoEvaluator.java | 114 +++++++++++++----
.../runners/direct/ParDoEvaluatorFactory.java | 11 +-
...littableProcessElementsEvaluatorFactory.java | 106 ++++++++++++----
.../direct/StatefulParDoEvaluatorFactory.java | 4 +-
.../direct/TransformEvaluatorRegistry.java | 4 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 3 +-
8 files changed, 341 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index b09ee08..8501e72 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.runners.core;
+import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -26,10 +28,12 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
/**
@@ -146,4 +150,32 @@ public class DoFnRunners {
stateCleaner,
droppedDueToLateness);
}
+
+ public static <InputT, OutputT, RestrictionT>
+ ProcessFnRunner<InputT, OutputT, RestrictionT>
+ newProcessFnRunner(
+ ProcessFn<InputT, OutputT, RestrictionT, ?> fn,
+ PipelineOptions options,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return new ProcessFnRunner<>(
+ simpleRunner(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy),
+ views,
+ sideInputReader);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
new file mode 100644
index 0000000..3ae3f50
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.core.SplittableParDo.ProcessFn;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
+
+/** Runs a {@link ProcessFn} by constructing the appropriate contexts and passing them in. */
+public class ProcessFnRunner<InputT, OutputT, RestrictionT>
+ implements PushbackSideInputDoFnRunner<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+ private final DoFnRunner<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ underlying;
+ private final Collection<PCollectionView<?>> views;
+ private final ReadyCheckingSideInputReader sideInputReader;
+
+ ProcessFnRunner(
+ DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ this.underlying = underlying;
+ this.views = views;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @Override
+ public void startBundle() {
+ underlying.startBundle();
+ }
+
+ @Override
+ public Iterable<WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>>
+ processElementInReadyWindows(
+ WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ windowedKWI) {
+ checkTrivialOuterWindows(windowedKWI);
+ BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue());
+ if (!isReady(window)) {
+ return Collections.singletonList(windowedKWI);
+ }
+ underlying.processElement(windowedKWI);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void finishBundle() {
+ underlying.finishBundle();
+ }
+
+ @Override
+ public void onTimer(
+ String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
+ }
+
+ private static <T> void checkTrivialOuterWindows(
+ WindowedValue<KeyedWorkItem<String, T>> windowedKWI) {
+ // In practice it will be in 0 or 1 windows (ValueInEmptyWindows or ValueInGlobalWindow)
+ Collection<? extends BoundedWindow> outerWindows = windowedKWI.getWindows();
+ if (!outerWindows.isEmpty()) {
+ checkArgument(
+ outerWindows.size() == 1,
+ "The KeyedWorkItem itself must not be in multiple windows, but was in: %s",
+ outerWindows);
+ BoundedWindow onlyWindow = Iterables.getOnlyElement(outerWindows);
+ checkArgument(
+ onlyWindow instanceof GlobalWindow,
+ "KeyedWorkItem must be in the Global window, but was in: %s",
+ onlyWindow);
+ }
+ }
+
+ private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<String, T> kwi) {
+ if (Iterables.isEmpty(kwi.elementsIterable())) {
+ // ProcessFn sets only a single timer.
+ TimerData timer = Iterables.getOnlyElement(kwi.timersIterable());
+ return ((WindowNamespace) timer.getNamespace()).getWindow();
+ } else {
+ // KWI must have a single element in elementsIterable, because it follows a GBK by a
+ // uniquely generated key.
+ // Additionally, windows must be exploded before GBKIntoKeyedWorkItems, so there's also
+ // only a single window.
+ WindowedValue<T> value = Iterables.getOnlyElement(kwi.elementsIterable());
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+ }
+
+ private boolean isReady(BoundedWindow mainInputWindow) {
+ for (PCollectionView<?> view : views) {
+ BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+ if (!sideInputReader.isReady(view, sideInputWindow)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index bab7b2c..cab11db 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -43,6 +44,50 @@ import org.apache.beam.sdk.values.TupleTag;
class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
+ public interface DoFnRunnerFactory<InputT, OutputT> {
+ PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ DirectStepContext stepContext,
+ AggregatorContainer.Mutator aggregatorChanges,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy);
+ }
+
+ public static <InputT, OutputT> DoFnRunnerFactory<InputT, OutputT> defaultRunnerFactory() {
+ return new DoFnRunnerFactory<InputT, OutputT>() {
+ @Override
+ public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ DirectStepContext stepContext,
+ AggregatorContainer.Mutator aggregatorChanges,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+ DoFnRunner<InputT, OutputT> underlying =
+ DoFnRunners.simpleRunner(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorChanges,
+ windowingStrategy);
+ return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+ }
+ };
+ }
+
public static <InputT, OutputT> ParDoEvaluator<InputT> create(
EvaluationContext evaluationContext,
DirectStepContext stepContext,
@@ -53,9 +98,43 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
- Map<TupleTag<?>, PCollection<?>> outputs) {
+ Map<TupleTag<?>, PCollection<?>> outputs,
+ DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
+ BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs);
+
+ ReadyCheckingSideInputReader sideInputReader =
+ evaluationContext.createSideInputReader(sideInputs);
+
+ PushbackSideInputDoFnRunner<InputT, OutputT> runner = runnerFactory.createRunner(
+ evaluationContext.getPipelineOptions(),
+ fn,
+ sideInputs,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorChanges,
+ windowingStrategy);
+
+ return create(runner, stepContext, application, aggregatorChanges, outputManager);
+ }
+
+ public static <InputT, OutputT> ParDoEvaluator<InputT> create(
+ PushbackSideInputDoFnRunner<InputT, OutputT> runner,
+ DirectStepContext stepContext,
+ AppliedPTransform<?, ?, ?> application,
+ AggregatorContainer.Mutator aggregatorChanges,
+ BundleOutputManager outputManager) {
+ return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+ }
+
+ static BundleOutputManager createOutputManager(
+ EvaluationContext evaluationContext,
+ StructuralKey<?> key,
+ Map<TupleTag<?>, PCollection<?>> outputs) {
Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
// Just trust the context's decision as to whether the output should be keyed.
@@ -69,32 +148,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
}
}
- BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
-
- ReadyCheckingSideInputReader sideInputReader =
- evaluationContext.createSideInputReader(sideInputs);
-
- DoFnRunner<InputT, OutputT> underlying =
- DoFnRunners.simpleRunner(
- evaluationContext.getPipelineOptions(),
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- additionalOutputTags,
- stepContext,
- aggregatorChanges,
- windowingStrategy);
- PushbackSideInputDoFnRunner<InputT, OutputT> runner =
- SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
-
- try {
- runner.startBundle();
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
-
- return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+ return BundleOutputManager.create(outputBundles);
}
////////////////////////////////////////////////////////////////////////////////////////////////
@@ -119,6 +173,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
this.stepContext = stepContext;
this.aggregatorChanges = aggregatorChanges;
this.unprocessedElements = ImmutableList.builder();
+
+ try {
+ fnRunner.startBundle();
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
}
public BundleOutputManager getOutputManager() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 93f204a..b00c2b6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -43,9 +43,13 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
private final EvaluationContext evaluationContext;
+ private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;
- ParDoEvaluatorFactory(EvaluationContext evaluationContext) {
+ ParDoEvaluatorFactory(
+ EvaluationContext evaluationContext,
+ ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
this.evaluationContext = evaluationContext;
+ this.runnerFactory = runnerFactory;
fnClones =
CacheBuilder.newBuilder()
.build(
@@ -148,7 +152,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
sideInputs,
mainOutputTag,
additionalOutputTags,
- pcollections(application.getOutputs()));
+ pcollections(application.getOutputs()),
+ runnerFactory);
} catch (Exception e) {
try {
fnManager.remove();
@@ -162,7 +167,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
}
}
- private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
+ static Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>();
for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
pcs.put(output.getKey(), (PCollection<?>) output.getValue());
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 00b16dd..7efdb52 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -18,25 +18,34 @@
package org.apache.beam.runners.direct;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ElementAndRestriction;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -51,7 +60,11 @@ class SplittableProcessElementsEvaluatorFactory<
SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
- this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+ this.delegateFactory =
+ new ParDoEvaluatorFactory<>(
+ evaluationContext,
+ SplittableProcessElementsEvaluatorFactory
+ .<InputT, OutputT, RestrictionT>processFnRunnerFactory());
}
@Override
@@ -82,12 +95,12 @@ class SplittableProcessElementsEvaluatorFactory<
final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
application.getTransform();
- SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+ ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
transform.newProcessFn(transform.getFn());
DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
processFn =
- ((SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+ ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
fnManager
.<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
get());
@@ -98,7 +111,7 @@ class SplittableProcessElementsEvaluatorFactory<
.getExecutionContext(application, inputBundle.getKey())
.getOrCreateStepContext(stepName, stepName);
- ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
parDoEvaluator =
delegateFactory.createParDoEvaluator(
application,
@@ -127,34 +140,36 @@ class SplittableProcessElementsEvaluatorFactory<
}
});
- final OutputManager outputManager = parDoEvaluator.getOutputManager();
+ OutputWindowedValue<OutputT> outputWindowedValue =
+ new OutputWindowedValue<OutputT>() {
+ private final OutputManager outputManager = parDoEvaluator.getOutputManager();
+
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(
+ transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+ }
+ };
processFn.setProcessElementInvoker(
new OutputAndTimeBoundedSplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>(
transform.getFn(),
evaluationContext.getPipelineOptions(),
- new OutputWindowedValue<OutputT>() {
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputManager.output(
- transform.getMainOutputTag(),
- WindowedValue.of(output, timestamp, windows, pane));
- }
-
- @Override
- public <AdditionalOutputT> void outputWindowedValue(
- TupleTag<AdditionalOutputT> tag,
- AdditionalOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
- }
- },
+ outputWindowedValue,
evaluationContext.createSideInputReader(transform.getSideInputs()),
// TODO: For better performance, use a higher-level executor?
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
@@ -163,4 +178,41 @@ class SplittableProcessElementsEvaluatorFactory<
return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
}
+
+ private static <InputT, OutputT, RestrictionT>
+ ParDoEvaluator.DoFnRunnerFactory<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ processFnRunnerFactory() {
+ return new ParDoEvaluator.DoFnRunnerFactory<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+ @Override
+ public PushbackSideInputDoFnRunner<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ createRunner(
+ PipelineOptions options,
+ DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ DirectExecutionContext.DirectStepContext stepContext,
+ AggregatorContainer.Mutator aggregatorChanges,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+ ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
+ (ProcessFn) fn;
+ return DoFnRunners.newProcessFnRunner(
+ processFn,
+ options,
+ sideInputs,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorChanges,
+ windowingStrategy);
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index be77ea1..8793ae8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -65,7 +65,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
- this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+ this.delegateFactory =
+ new ParDoEvaluatorFactory<>(
+ evaluationContext, ParDoEvaluator.<KV<K, InputT>, OutputT>defaultRunnerFactory());
this.cleanupRegistry =
CacheBuilder.newBuilder()
.weakValues()
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index ae7ad93..d06c460 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -52,7 +52,9 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
.put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
.put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
- .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt))
+ .put(
+ ParDo.MultiOutput.class,
+ new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory()))
.put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
.put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
.put(WriteView.class, new ViewEvaluatorFactory(ctxt))
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 2be0f9d..e99e4bf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -169,7 +169,8 @@ public class ParDoEvaluatorTest {
ImmutableList.<PCollectionView<?>>of(singletonView),
mainOutputTag,
additionalOutputTags,
- ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
+ ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output),
+ ParDoEvaluator.<Integer, Integer>defaultRunnerFactory());
}
private static class RecorderFn extends DoFn<Integer, Integer> {
[4/7] beam git commit: Minor cleanups in ParDoEvaluator
Posted by jk...@apache.org.
Minor cleanups in ParDoEvaluator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc16b0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc16b0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc16b0d
Branch: refs/heads/master
Commit: 1cc16b0d6cea7b01b01427758eaf427cc29635b6
Parents: 3fd8890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 12:25:02 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700
----------------------------------------------------------------------
...oFnLifecycleManagerRemovingTransformEvaluator.java | 6 +++---
.../apache/beam/runners/direct/ParDoEvaluator.java | 14 +++++---------
.../beam/runners/direct/ParDoEvaluatorFactory.java | 2 +-
.../SplittableProcessElementsEvaluatorFactory.java | 2 +-
...ifecycleManagerRemovingTransformEvaluatorTest.java | 8 ++++----
.../beam/runners/direct/ParDoEvaluatorTest.java | 4 ++--
6 files changed, 16 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index 9bcd569..e537962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory;
class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
private static final Logger LOG =
LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class);
- private final ParDoEvaluator<InputT, ?> underlying;
+ private final ParDoEvaluator<InputT> underlying;
private final DoFnLifecycleManager lifecycleManager;
public static <InputT> DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping(
- ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+ ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, lifecycleManager);
}
private DoFnLifecycleManagerRemovingTransformEvaluator(
- ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+ ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
this.underlying = underlying;
this.lifecycleManager = lifecycleManager;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 49d0723..131716f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -40,9 +40,9 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
+class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
- public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
+ public static <InputT, OutputT> ParDoEvaluator<InputT> create(
EvaluationContext evaluationContext,
DirectStepContext stepContext,
AppliedPTransform<?, ?, ?> application,
@@ -93,13 +93,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
throw UserCodeException.wrap(e);
}
- return new ParDoEvaluator<>(
- evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext);
+ return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
}
////////////////////////////////////////////////////////////////////////////////////////////////
- private final EvaluationContext evaluationContext;
private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
private final AppliedPTransform<?, ?, ?> transform;
private final AggregatorContainer.Mutator aggregatorChanges;
@@ -109,13 +107,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
private ParDoEvaluator(
- EvaluationContext evaluationContext,
PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
AppliedPTransform<?, ?, ?> transform,
AggregatorContainer.Mutator aggregatorChanges,
BundleOutputManager outputManager,
DirectStepContext stepContext) {
- this.evaluationContext = evaluationContext;
this.fnRunner = fnRunner;
this.transform = transform;
this.outputManager = outputManager;
@@ -153,11 +149,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
- StepTransformResult.Builder resultBuilder;
+ StepTransformResult.Builder<InputT> resultBuilder;
CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
if (state != null) {
resultBuilder =
- StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+ StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())
.withState(state);
} else {
resultBuilder = StepTransformResult.withoutHold(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 0372295..93f204a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -126,7 +126,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
fnManager);
}
- ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+ ParDoEvaluator<InputT> createParDoEvaluator(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
StructuralKey<?> key,
List<PCollectionView<?>> sideInputs,
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 64cef35..00b16dd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -98,7 +98,7 @@ class SplittableProcessElementsEvaluatorFactory<
.getExecutionContext(application, inputBundle.getKey())
.getOrCreateStepContext(stepName, stepName);
- ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
parDoEvaluator =
delegateFactory.createParDoEvaluator(
application,
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index d046ce5..1ac4d6d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -53,7 +53,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void delegatesToUnderlying() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
DoFn<?, ?> original = lifecycleManager.get();
TransformEvaluator<Object> evaluator =
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
@@ -72,7 +72,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInProcessElement() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class));
DoFn<?, ?> original = lifecycleManager.get();
@@ -91,7 +91,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInOnTimer() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
doThrow(Exception.class)
.when(underlying)
.onTimer(any(TimerData.class), any(BoundedWindow.class));
@@ -114,7 +114,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInFinishBundle() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
doThrow(Exception.class).when(underlying).finishBundle();
DoFn<?, ?> original = lifecycleManager.get();
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 65a1248..2be0f9d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output);
when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
- ParDoEvaluator<Integer, Integer> evaluator =
+ ParDoEvaluator<Integer> evaluator =
createEvaluator(singletonView, fn, output);
IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
@@ -130,7 +130,7 @@ public class ParDoEvaluatorTest {
WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L))));
}
- private ParDoEvaluator<Integer, Integer> createEvaluator(
+ private ParDoEvaluator<Integer> createEvaluator(
PCollectionView<Integer> singletonView,
RecorderFn fn,
PCollection<Integer> output) {
[2/7] beam git commit: ProcessFn remembers more info about its
application context
Posted by jk...@apache.org.
ProcessFn remembers more info about its application context
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3fd88901
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3fd88901
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3fd88901
Branch: refs/heads/master
Commit: 3fd889015afa8528801d2c35c8c9f72b944ea472
Parents: a51bdd2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 15 16:39:51 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SplittableParDo.java | 35 +++++++++++++++-----
.../beam/runners/core/SplittableParDoTest.java | 8 ++++-
2 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 9cc965a..44db1f7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -115,7 +115,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
fn,
input.getCoder(),
restrictionCoder,
- input.getWindowingStrategy(),
+ (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
parDo.getSideInputs(),
parDo.getMainOutputTag(),
parDo.getAdditionalOutputTags()));
@@ -185,7 +185,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private final DoFn<InputT, OutputT> fn;
private final Coder<InputT> elementCoder;
private final Coder<RestrictionT> restrictionCoder;
- private final WindowingStrategy<?, ?> windowingStrategy;
+ private final WindowingStrategy<InputT, ?> windowingStrategy;
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
@@ -202,7 +202,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
DoFn<InputT, OutputT> fn,
Coder<InputT> elementCoder,
Coder<RestrictionT> restrictionCoder,
- WindowingStrategy<?, ?> windowingStrategy,
+ WindowingStrategy<InputT, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags) {
@@ -234,7 +234,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
DoFn<InputT, OutputT> fn) {
return new SplittableParDo.ProcessFn<>(
- fn, elementCoder, restrictionCoder, windowingStrategy.getWindowFn().windowCoder());
+ fn, elementCoder, restrictionCoder, windowingStrategy);
}
@Override
@@ -351,7 +351,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
private final DoFn<InputT, OutputT> fn;
- private final Coder<? extends BoundedWindow> windowCoder;
+ private final Coder<InputT> elementCoder;
+ private final Coder<RestrictionT> restrictionCoder;
+ private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
private transient StateInternalsFactory<String> stateInternalsFactory;
private transient TimerInternalsFactory<String> timerInternalsFactory;
@@ -364,11 +366,16 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
DoFn<InputT, OutputT> fn,
Coder<InputT> elementCoder,
Coder<RestrictionT> restrictionCoder,
- Coder<? extends BoundedWindow> windowCoder) {
+ WindowingStrategy<InputT, ?> inputWindowingStrategy) {
this.fn = fn;
- this.windowCoder = windowCoder;
+ this.elementCoder = elementCoder;
+ this.restrictionCoder = restrictionCoder;
+ this.inputWindowingStrategy = inputWindowingStrategy;
this.elementTag =
- StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
+ StateTags.value(
+ "element",
+ WindowedValue.getFullCoder(
+ elementCoder, inputWindowingStrategy.getWindowFn().windowCoder()));
this.restrictionTag = StateTags.value("restriction", restrictionCoder);
}
@@ -389,6 +396,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return fn;
}
+ public Coder<InputT> getElementCoder() {
+ return elementCoder;
+ }
+
+ public Coder<RestrictionT> getRestrictionCoder() {
+ return restrictionCoder;
+ }
+
+ public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
+ return inputWindowingStrategy;
+ }
+
@Setup
public void setup() throws Exception {
invoker = DoFnInvokers.invokerFor(fn);
http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 2c89543..5629635 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -51,11 +51,13 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -220,9 +222,13 @@ public class SplittableParDoTest {
int maxOutputsPerBundle,
Duration maxBundleDuration)
throws Exception {
+ // The exact windowing strategy doesn't matter in this test, but it should be able to
+ // encode IntervalWindow's because that's what all tests here use.
+ WindowingStrategy<InputT, BoundedWindow> windowingStrategy =
+ (WindowingStrategy) WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1)));
final SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
new SplittableParDo.ProcessFn<>(
- fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
+ fn, inputCoder, restrictionCoder, windowingStrategy);
this.tester = DoFnTester.of(processFn);
this.timerInternals = new InMemoryTimerInternals();
this.stateInternals = new TestInMemoryStateInternals<>("dummy");
[6/7] beam git commit: Explodes windows before GBKIKWI
Posted by jk...@apache.org.
Explodes windows before GBKIKWI
Also
* Adds a test for windowed side inputs that requires this
behavior.
* Adds a test category for SDF with windowed side input.
Runners should gradually implement this properly. For now
only direct runner implements this properly.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ac3ac50
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ac3ac50
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ac3ac50
Branch: refs/heads/master
Commit: 6ac3ac50fec2eb02927c0a07ca928967cfef5652
Parents: b93de58
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 11:28:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SplittableParDo.java | 75 +++++++++---------
.../beam/runners/core/SplittableParDoTest.java | 82 +++++++-------------
runners/flink/runner/pom.xml | 3 +-
...esSplittableParDoWithWindowedSideInputs.java | 26 +++++++
.../beam/sdk/transforms/SplittableDoFnTest.java | 41 ++++++++++
5 files changed, 137 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 44db1f7..31d89ee 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -19,10 +19,8 @@ package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.List;
import java.util.UUID;
@@ -138,6 +136,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
.setCoder(splitCoder)
.apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
.setCoder(splitCoder)
+ // ProcessFn requires all input elements to be in a single window and have a single
+ // element per work item. This must precede the unique keying so each key has a single
+ // associated element.
+ .apply(
+ "Explode windows",
+ ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, RestrictionT>>()))
.apply(
"Assign unique key",
WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
@@ -158,6 +162,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
/**
+ * A {@link DoFn} that forces each of its outputs to be in a single window, by indicating to the
+ * runner that it observes the window of its input element, so the runner is forced to apply it to
+ * each input in a single window and thus its output is also in a single window.
+ */
+ private static class ExplodeWindowsFn<InputT> extends DoFn<InputT, InputT> {
+ @ProcessElement
+ public void process(ProcessContext c, BoundedWindow window) {
+ c.output(c.element());
+ }
+ }
+
+ /**
* Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces
* {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers.
*
@@ -317,6 +333,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
* by creating a tracker for the restriction and checkpointing/resuming processing later if
* necessary.
+ *
+ * <p>Takes {@link KeyedWorkItem} and assumes that the KeyedWorkItem contains a single element
+ * (or a single timer set by {@link ProcessFn itself}, in a single window. This is necessary
+ * because {@link ProcessFn} sets timers, and timers are namespaced to a single window and it
+ * should be the window of the input element.
+ *
+ * <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
*/
@VisibleForTesting
public static class ProcessFn<
@@ -441,7 +464,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
// Subsequent calls are timer firings and the element has to be retrieved from the state.
TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
boolean isSeedCall = (timer == null);
- StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
+ StateNamespace stateNamespace;
+ if (isSeedCall) {
+ WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+ Iterables.getOnlyElement(c.element().elementsIterable());
+ BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
+ stateNamespace =
+ StateNamespaces.window(
+ (Coder<BoundedWindow>) inputWindowingStrategy.getWindowFn().windowCoder(), window);
+ } else {
+ stateNamespace = timer.getNamespace();
+ }
+
ValueState<WindowedValue<InputT>> elementState =
stateInternals.state(stateNamespace, elementTag);
ValueState<RestrictionT> restrictionState =
@@ -451,15 +485,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
if (isSeedCall) {
- // The element and restriction are available in c.element().
- // elementsIterable() will, by construction of SplittableParDo, contain the same value
- // potentially in several different windows. We implode this into a single WindowedValue
- // in order to simplify the rest of the code and avoid iterating over elementsIterable()
- // explicitly. The windows of this WindowedValue will be propagated to windows of the
- // output. This is correct because a splittable DoFn is not allowed to inspect the window
- // of its element.
WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
- implodeWindows(c.element().elementsIterable());
+ Iterables.getOnlyElement(c.element().elementsIterable());
WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
elementState.write(element);
elementAndRestriction =
@@ -498,32 +525,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
}
- /**
- * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link
- * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies
- * of the same value with the same timestamp, but different window sets.
- *
- * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique
- * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s
- * for the same key, that means only that the windows of that {@link ElementAndRestriction} are
- * being delivered separately rather than all at once. It is also legal to do because splittable
- * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full
- * set of windows of its input to its output.
- */
- private static <InputT, RestrictionT>
- WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows(
- Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) {
- WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first =
- Iterables.getFirst(values, null);
- checkState(first != null, "Got a KeyedWorkItem with no elements and no timers");
- ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder();
- for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) {
- windows.addAll(value.getWindows());
- }
- return WindowedValue.of(
- first.getValue(), first.getTimestamp(), windows.build(), first.getPane());
- }
-
private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) {
return fn.new Context() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 5629635..1a44453 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
@@ -194,11 +195,6 @@ public class SplittableParDoTest {
// ------------------------------- Tests for ProcessFn ---------------------------------
- enum WindowExplosion {
- EXPLODE_WINDOWS,
- DO_NOT_EXPLODE_WINDOWS
- }
-
/**
* A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
* {@link DoFn.ProcessElement} calls).
@@ -293,24 +289,13 @@ public class SplittableParDoTest {
ElementAndRestriction.of(element, restriction),
currentProcessingTime,
GlobalWindow.INSTANCE,
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
- WindowExplosion.DO_NOT_EXPLODE_WINDOWS);
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
- void startElement(
- WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue,
- WindowExplosion explosion)
+ void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
throws Exception {
- switch (explosion) {
- case EXPLODE_WINDOWS:
- tester.processElement(
- KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows()));
- break;
- case DO_NOT_EXPLODE_WINDOWS:
- tester.processElement(
- KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
- break;
- }
+ tester.processElement(
+ KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue)));
}
/**
@@ -394,46 +379,39 @@ public class SplittableParDoTest {
}
@Test
- public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception {
- // Tests that ProcessFn correctly propagates windows and timestamp of the element
+ public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception {
+ // Tests that ProcessFn correctly propagates the window and timestamp of the element
// inside the KeyedWorkItem.
// The underlying DoFn is actually monolithic, so this doesn't test splitting.
DoFn<Integer, String> fn = new ToStringFn();
Instant base = Instant.now();
- IntervalWindow w1 =
+ IntervalWindow w =
new IntervalWindow(
base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));
- IntervalWindow w2 =
- new IntervalWindow(
- base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2)));
- IntervalWindow w3 =
- new IntervalWindow(
- base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
-
- for (WindowExplosion explosion : WindowExplosion.values()) {
- ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
- new ProcessFnTester<>(
- base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class),
- MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
- tester.startElement(
- WindowedValue.of(
- ElementAndRestriction.of(42, new SomeRestriction()),
- base,
- Arrays.asList(w1, w2, w3),
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
- explosion);
-
- for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
- assertEquals(
- Arrays.asList(
- TimestampedValue.of("42a", base),
- TimestampedValue.of("42b", base),
- TimestampedValue.of("42c", base)),
- tester.peekOutputElementsInWindow(w));
- }
- }
+
+ ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+ new ProcessFnTester<>(
+ base,
+ fn,
+ BigEndianIntegerCoder.of(),
+ SerializableCoder.of(SomeRestriction.class),
+ MAX_OUTPUTS_PER_BUNDLE,
+ MAX_BUNDLE_DURATION);
+ tester.startElement(
+ WindowedValue.of(
+ ElementAndRestriction.of(42, new SomeRestriction()),
+ base,
+ Collections.singletonList(w),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ assertEquals(
+ Arrays.asList(
+ TimestampedValue.of("42a", base),
+ TimestampedValue.of("42b", base),
+ TimestampedValue.of("42c", base)),
+ tester.peekOutputElementsInWindow(w));
}
private static class WatermarkUpdateFn extends DoFn<Instant, String> {
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 95880f4..1e6452d 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -91,7 +91,8 @@
org.apache.beam.sdk.testing.UsesMapState,
org.apache.beam.sdk.testing.UsesAttemptedMetrics,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
- org.apache.beam.sdk.testing.UsesTestStream
+ org.apache.beam.sdk.testing.UsesTestStream,
+ org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
new file mode 100644
index 0000000..2b1d673
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize splittable {@link ParDo} and use
+ * windowed side inputs.
+ */
+public interface UsesSplittableParDoWithWindowedSideInputs {}
http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 30329f4..a0f1fd3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
@@ -252,6 +253,46 @@ public class SplittableDoFnTest implements Serializable {
p.run();
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesSplittableParDo.class,
+ UsesSplittableParDoWithWindowedSideInputs.class
+ })
+ public void testWindowedSideInput() throws Exception {
+ PCollection<Integer> mainInput =
+ p.apply("main",
+ Create.timestamped(
+ TimestampedValue.of(0, new Instant(0)),
+ TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(2, new Instant(2)),
+ TimestampedValue.of(3, new Instant(3)),
+ TimestampedValue.of(4, new Instant(4)),
+ TimestampedValue.of(5, new Instant(5)),
+ TimestampedValue.of(6, new Instant(6)),
+ TimestampedValue.of(7, new Instant(7))))
+ .apply("window 2", Window.<Integer>into(FixedWindows.of(Duration.millis(2))));
+
+ PCollectionView<String> sideInput =
+ p.apply("side",
+ Create.timestamped(
+ TimestampedValue.of("a", new Instant(0)),
+ TimestampedValue.of("b", new Instant(4))))
+ .apply("window 4", Window.<String>into(FixedWindows.of(Duration.millis(4))))
+ .apply("singleton", View.<String>asSingleton());
+
+ PCollection<String> res =
+ mainInput.apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput));
+
+ PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
+
+ p.run();
+
+ // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
+ // properly access side inputs.
+ // TODO: also test coverage when some of the windows of the side input are not ready.
+ }
+
private static class SDFWithAdditionalOutput extends DoFn<Integer, String> {
private final TupleTag<String> additionalOutput;