You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/23 06:51:58 UTC
[02/50] incubator-beam git commit: Replaces SideInputAccess with
SideInputReader
Replaces SideInputAccess with SideInputReader
Makes WindowingInternals.sideInput take the side input window
instead of main input window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a0d0e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a0d0e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a0d0e1
Branch: refs/heads/gearpump-runner
Commit: 90a0d0e13fa0332df805b79b1dc64860d9590217
Parents: 8243fcd
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Nov 14 14:48:31 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 26 ++++++++++---
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 2 +-
.../runners/core/ReduceFnContextFactory.java | 27 +++++++++-----
.../beam/runners/core/ReduceFnRunner.java | 5 ++-
.../beam/runners/core/SideInputAccess.java | 31 ----------------
.../beam/runners/core/SimpleDoFnRunner.java | 32 +++++++---------
.../beam/runners/core/SimpleOldDoFnRunner.java | 11 +++---
.../core/WindowingInternalsAdapters.java | 21 ++++++++---
.../beam/runners/core/ReduceFnTester.java | 34 +++++------------
.../GroupAlsoByWindowEvaluatorFactory.java | 21 +++++++----
.../functions/FlinkDoFnFunction.java | 15 ++++----
.../functions/FlinkProcessContextBase.java | 9 +----
.../FlinkSingleOutputProcessContext.java | 1 -
.../runners/spark/translation/DoFnFunction.java | 5 +--
.../spark/translation/MultiDoFnFunction.java | 6 +--
.../spark/translation/SparkProcessContext.java | 39 ++++++++++++--------
.../apache/beam/sdk/transforms/DoFnTester.java | 2 +-
.../beam/sdk/util/WindowingInternals.java | 4 +-
19 files changed, 141 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 8fbfb03..eca4308 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -354,13 +354,26 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
@Override
- public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ public void outputWindowedValue(
+ KV<K, Iterable<V>> output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
if (traceTuples) {
LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
}
- ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
- WindowedValue.of(output, timestamp, windows, pane)));
+ ApexGroupByKeyOperator.this.output.emit(
+ ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
}
@Override
@@ -379,8 +392,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
@Override
- public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
- Coder<T> elemCoder) throws IOException {
+ public <T> void writePCollectionViewData(
+ TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder)
+ throws IOException {
throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index bcc52d3..8b10813 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -88,7 +88,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
stateInternals,
timerInternals,
WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
- WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
+ WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
droppedDueToClosedWindow,
reduceFn,
c.getPipelineOptions());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 45c0eda..f8f6207 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -77,7 +77,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
stateInternals,
timerInternals,
WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
- WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
+ WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
droppedDueToClosedWindow,
reduceFn,
c.getPipelineOptions());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index d43fb8e..539126a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -28,7 +28,9 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
@@ -62,7 +64,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private final StateInternals<K> stateInternals;
private final ActiveWindowSet<W> activeWindows;
private final TimerInternals timerInternals;
- private final SideInputAccess sideInputAccess;
+ private final SideInputReader sideInputReader;
private final PipelineOptions options;
ReduceFnContextFactory(
@@ -72,7 +74,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
StateInternals<K> stateInternals,
ActiveWindowSet<W> activeWindows,
TimerInternals timerInternals,
- SideInputAccess sideInputAccess,
+ SideInputReader sideInputReader,
PipelineOptions options) {
this.key = key;
this.reduceFn = reduceFn;
@@ -80,7 +82,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
this.stateInternals = stateInternals;
this.activeWindows = activeWindows;
this.timerInternals = timerInternals;
- this.sideInputAccess = sideInputAccess;
+ this.sideInputReader = sideInputReader;
this.options = options;
}
@@ -94,8 +96,14 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
return new StateAccessorImpl<K, W>(
- activeWindows, windowingStrategy.getWindowFn().windowCoder(),
- stateInternals, stateContextFromComponents(options, sideInputAccess, window),
+ activeWindows,
+ windowingStrategy.getWindowFn().windowCoder(),
+ stateInternals,
+ stateContextFromComponents(
+ options,
+ sideInputReader,
+ window,
+ windowingStrategy.getWindowFn()),
style);
}
@@ -504,8 +512,9 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents(
@Nullable final PipelineOptions options,
- final SideInputAccess sideInputAccess,
- final W window) {
+ final SideInputReader sideInputReader,
+ final W mainInputWindow,
+ final WindowFn<?, W> windowFn) {
if (options == null) {
return StateContexts.nullContext();
} else {
@@ -518,12 +527,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
@Override
public <T> T sideInput(PCollectionView<T> view) {
- return sideInputAccess.sideInput(view, window);
+ return sideInputReader.get(view, windowFn.getSideInputWindow(mainInputWindow));
}
@Override
public W window() {
- return window;
+ return mainInputWindow;
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 023a77a..a686f46 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ActiveWindowSet;
import org.apache.beam.sdk.util.MergingActiveWindowSet;
import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
+import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
@@ -217,7 +218,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
StateInternals<K> stateInternals,
TimerInternals timerInternals,
OutputWindowedValue<KV<K, OutputT>> outputter,
- SideInputAccess sideInputAccess,
+ SideInputReader sideInputReader,
Aggregator<Long, Long> droppedDueToClosedWindow,
ReduceFn<K, InputT, OutputT, W> reduceFn,
PipelineOptions options) {
@@ -241,7 +242,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
this.contextFactory =
new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy,
- stateInternals, this.activeWindows, timerInternals, sideInputAccess, options);
+ stateInternals, this.activeWindows, timerInternals, sideInputReader, options);
this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
this.triggerRunner =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
deleted file mode 100644
index 7d64566..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Allows accessing the side inputs for a particular main input window.
- */
-public interface SideInputAccess {
- /**
- * Return the value of the side input for the window of a main input element.
- */
- <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index c0f3a02..76aae8f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -142,7 +142,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
private void invokeProcessElement(WindowedValue<InputT> elem) {
- DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem);
+ final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem);
// This can contain user code. Wrap it in case it throws an exception.
try {
@@ -283,12 +283,10 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
return WindowedValue.of(output, timestamp, windows, pane);
}
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
if (!sideInputReader.contains(view)) {
throw new IllegalArgumentException("calling sideInput() with unknown view");
}
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
return sideInputReader.get(view, sideInputWindow);
}
@@ -432,7 +430,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
final DoFnContext<InputT, OutputT> context;
final WindowedValue<InputT> windowedValue;
- public DoFnProcessContext(
+ private DoFnProcessContext(
DoFn<InputT, OutputT> fn,
DoFnContext<InputT, OutputT> context,
WindowedValue<InputT> windowedValue) {
@@ -473,7 +471,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
"sideInput called when main input element is in multiple windows");
}
}
- return context.sideInput(view, window);
+ return context.sideInput(
+ view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
}
@Override
@@ -493,14 +492,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
}
- void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- context.outputWindowedValue(output, timestamp, windows, pane);
- }
-
@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
checkNotNull(tag, "Tag passed to sideOutput cannot be null");
@@ -628,7 +619,9 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {}
+ PaneInfo pane) {
+ throw new UnsupportedOperationException("A DoFn cannot output to a different window");
+ }
@Override
public <SideOutputT> void sideOutputWindowedValue(
@@ -636,11 +629,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
SideOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {}
+ PaneInfo pane) {
+ throw new UnsupportedOperationException(
+ "A DoFn cannot side output to a different window");
+ }
@Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- return context.sideInput(view, mainInputWindow);
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+ return context.sideInput(view, sideInputWindow);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 8efc27b..cbda791 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -251,12 +251,10 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
return WindowedValue.of(output, timestamp, windows, pane);
}
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
if (!sideInputReader.contains(view)) {
throw new IllegalArgumentException("calling sideInput() with unknown view");
}
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
return sideInputReader.get(view, sideInputWindow);
}
@@ -390,7 +388,8 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
"sideInput called when main input element is in multiple windows");
}
}
- return context.sideInput(view, window);
+ return context.sideInput(
+ view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
}
@Override
@@ -515,8 +514,8 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- return context.sideInput(view, mainInputWindow);
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+ return context.sideInput(view, sideInputWindow);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
index 1b47e2b..7f80844 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
@@ -20,21 +20,32 @@ package org.apache.beam.runners.core;
import java.util.Collection;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
/**
- * Adapters from {@link WindowingInternals} to {@link SideInputAccess} and {@link
+ * Adapters from {@link WindowingInternals} to {@link SideInputReader} and {@link
* OutputWindowedValue}.
*/
public class WindowingInternalsAdapters {
- static SideInputAccess sideInputAccess(final WindowingInternals<?, ?> windowingInternals) {
- return new SideInputAccess() {
+ static SideInputReader sideInputReader(final WindowingInternals<?, ?> windowingInternals) {
+ return new SideInputReader() {
@Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- return windowingInternals.sideInput(view, mainInputWindow);
+ public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+ return windowingInternals.sideInput(view, sideInputWindow);
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> view) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ throw new UnsupportedOperationException();
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 5f8424e..337be23 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -80,7 +80,6 @@ import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
import org.apache.beam.sdk.util.state.TimerCallback;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
@@ -105,7 +104,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
private final WindowFn<Object, W> windowFn;
private final TestOutputWindowedValue testOutputter;
- private final TestSideInputAccess testSideInputAccess;
+ private final SideInputReader sideInputReader;
private final Coder<OutputT> outputCoder;
private final WindowingStrategy<Object, W> objectStrategy;
private final ExecutableTriggerStateMachine executableTriggerStateMachine;
@@ -291,7 +290,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
this.reduceFn = reduceFn;
this.windowFn = objectStrategy.getWindowFn();
this.testOutputter = new TestOutputWindowedValue();
- this.testSideInputAccess = new TestSideInputAccess(sideInputReader);
+ this.sideInputReader = sideInputReader;
this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine);
this.outputCoder = outputCoder;
this.options = options;
@@ -314,7 +313,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
stateInternals,
timerInternals,
testOutputter,
- testSideInputAccess,
+ sideInputReader,
droppedDueToClosedWindow,
reduceFn,
options);
@@ -522,8 +521,11 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>();
@Override
- public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ public void outputWindowedValue(
+ KV<String, OutputT> output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
// Copy the output value (using coders) before capturing it.
KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder(
KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow");
@@ -538,25 +540,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- throw new UnsupportedOperationException();
- }
- }
-
- private class TestSideInputAccess implements SideInputAccess {
- private SideInputReader sideInputReader;
-
- private TestSideInputAccess(SideInputReader sideInputReader) {
- this.sideInputReader = sideInputReader;
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- if (!sideInputReader.contains(view)) {
- throw new IllegalArgumentException("calling sideInput() with unknown view");
- }
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
- return sideInputReader.get(view, sideInputWindow);
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 0e8adba..a5bb214 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -22,7 +22,6 @@ import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
@@ -31,7 +30,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindo
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
-import org.apache.beam.runners.core.SideInputAccess;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -47,14 +45,13 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -176,12 +173,22 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
stateInternals,
timerInternals,
new OutputWindowedValueToBundle<>(bundle),
- new SideInputAccess() {
+ new SideInputReader() {
@Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) {
throw new UnsupportedOperationException(
"GroupAlsoByWindow must not have side inputs");
}
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> view) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ throw new UnsupportedOperationException();
+ }
},
droppedDueToClosedWindow,
reduceFn,
@@ -276,7 +283,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- throw new UnsupportedOperationException("Can't output to side outputs from a ReduceFn");
+ throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index dc0ef0f..db045f5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -64,13 +64,14 @@ public class FlinkDoFnFunction<InputT, OutputT>
Iterable<WindowedValue<InputT>> values,
Collector<WindowedValue<OutputT>> out) throws Exception {
- FlinkSingleOutputProcessContext<InputT, OutputT> context = new FlinkSingleOutputProcessContext<>(
- serializedOptions.getPipelineOptions(),
- getRuntimeContext(),
- doFn,
- windowingStrategy,
- sideInputs, out
- );
+ FlinkSingleOutputProcessContext<InputT, OutputT> context =
+ new FlinkSingleOutputProcessContext<>(
+ serializedOptions.getPipelineOptions(),
+ getRuntimeContext(),
+ doFn,
+ windowingStrategy,
+ sideInputs,
+ out);
this.doFn.startBundle(context);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index b814015..2169785 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -43,7 +43,6 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Collector;
import org.joda.time.Instant;
/**
@@ -162,19 +161,13 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
@Override
public <ViewT> ViewT sideInput(
PCollectionView<ViewT> view,
- BoundedWindow mainInputWindow) {
+ BoundedWindow sideInputWindow) {
checkNotNull(view, "View passed to sideInput cannot be null");
checkNotNull(
sideInputs.get(view),
"Side input for " + view + " not available.");
- // get the side input strategy for mapping the window
- WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
-
- BoundedWindow sideInputWindow =
- windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow);
-
Map<BoundedWindow, ViewT> sideInputs =
runtimeContext.getBroadcastVariableWithInitializer(
view.getTagInternal().getId(), new SideInputInitializer<>(view));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
index d67f6fd..529b1cc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Map;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index fa08c5b..f4be121 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.FlatMapFunction;
-import org.joda.time.Instant;
/**
@@ -88,12 +87,12 @@ public class DoFnFunction<InputT, OutputT>
}
@Override
- public synchronized void output(WindowedValue<OutputT> o) {
+ protected synchronized void outputWindowedValue(WindowedValue<OutputT> o) {
outputs.add(o);
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+ protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output) {
throw new UnsupportedOperationException(
"sideOutput is an unsupported operation for doFunctions, use a "
+ "MultiDoFunction instead.");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index d015b08..8175beb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -34,8 +34,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.joda.time.Instant;
-
import scala.Tuple2;
/**
@@ -98,12 +96,12 @@ public class MultiDoFnFunction<InputT, OutputT>
}
@Override
- public synchronized void output(WindowedValue<OutputT> o) {
+ protected synchronized void outputWindowedValue(WindowedValue<OutputT> o) {
outputs.put(mMainOutputTag, o);
}
@Override
- public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+ protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output) {
outputs.put(tag, output);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index afbc824..6a6cbd4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -150,9 +150,9 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
public void outputWithTimestamp(OutputT output, Instant timestamp) {
if (windowedValue == null) {
// this is start/finishBundle.
- output(noElementWindowedValue(output, timestamp, windowFn));
+ outputWindowedValue(noElementWindowedValue(output, timestamp, windowFn));
} else {
- output(WindowedValue.of(output, timestamp, windowedValue.getWindows(),
+ outputWindowedValue(WindowedValue.of(output, timestamp, windowedValue.getWindows(),
windowedValue.getPane()));
}
}
@@ -167,15 +167,16 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
if (windowedValue == null) {
// this is start/finishBundle.
- sideOutput(tag, noElementWindowedValue(output, timestamp, windowFn));
+ sideOutputWindowedValue(tag, noElementWindowedValue(output, timestamp, windowFn));
} else {
- sideOutput(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(),
+ sideOutputWindowedValue(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(),
windowedValue.getPane()));
}
}
- public abstract void output(WindowedValue<OutputT> output);
- public abstract <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output);
+ protected abstract void outputWindowedValue(WindowedValue<OutputT> output);
+
+ protected abstract <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output);
static <T, W extends BoundedWindow> WindowedValue<T> noElementWindowedValue(
final T output, final Instant timestamp, WindowFn<Object, W> windowFn) {
@@ -241,16 +242,24 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
}
@Override
- public void outputWindowedValue(OutputT output, Instant timestamp, Collection<?
- extends BoundedWindow> windows, PaneInfo paneInfo) {
- output(WindowedValue.of(output, timestamp, windows, paneInfo));
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo paneInfo) {
+ SparkProcessContext.this.outputWindowedValue(
+ WindowedValue.of(output, timestamp, windows, paneInfo));
}
@Override
public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {
- sideOutput(tag, WindowedValue.of(output, timestamp, windows, paneInfo));
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo paneInfo) {
+ SparkProcessContext.this.sideOutputWindowedValue(
+ tag, WindowedValue.of(output, timestamp, windows, paneInfo));
}
@Override
@@ -273,14 +282,14 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
}
@Override
- public <T> void writePCollectionViewData(TupleTag<?> tag,
- Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+ public <T> void writePCollectionViewData(
+ TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
throw new UnsupportedOperationException(
"WindowingInternals#writePCollectionViewData() is not yet supported.");
}
@Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
throw new UnsupportedOperationException(
"WindowingInternals#sideInput() is not yet supported.");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index dd7d894..bbf0315 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -710,7 +710,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public <T> T sideInput(
- PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ PCollectionView<T> view, BoundedWindow sideInputWindow) {
throw new UnsupportedOperationException(
"SideInput from WindowingInternals is not supported in in the context of DoFnTester");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
index ab3c600..5e90864 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
@@ -86,7 +86,7 @@ public interface WindowingInternals<InputT, OutputT> {
Coder<T> elemCoder) throws IOException;
/**
- * Return the value of the side input for the window of a main input element.
+ * Return the value of the side input for a particular side input window.
*/
- <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow);
+ <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow);
}