You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:20 UTC
[16/50] incubator-beam git commit: Liberates ReduceFnRunner from
WindowingInternals
Liberates ReduceFnRunner from WindowingInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24cae56a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24cae56a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24cae56a
Branch: refs/heads/python-sdk
Commit: 24cae56ad3a7b25b9e2114907f1d069a243f87dd
Parents: 1543ea9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Nov 10 18:40:53 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 5 +-
.../beam/runners/core/OutputWindowedValue.java | 46 ++++++++++++
.../runners/core/ReduceFnContextFactory.java | 71 ++++++++++++++++---
.../beam/runners/core/ReduceFnRunner.java | 39 ++---------
.../beam/runners/core/SideInputAccess.java | 31 +++++++++
.../beam/runners/core/SimpleDoFnRunner.java | 12 ++++
.../beam/runners/core/SimpleOldDoFnRunner.java | 10 +++
.../core/WindowingInternalsAdapters.java | 65 +++++++++++++++++
.../beam/runners/core/ReduceFnTester.java | 67 ++++++------------
.../GroupAlsoByWindowEvaluatorFactory.java | 73 ++++++--------------
.../functions/FlinkProcessContext.java | 13 ++++
.../spark/translation/SparkProcessContext.java | 7 ++
.../apache/beam/sdk/transforms/DoFnTester.java | 16 +++--
.../beam/sdk/util/WindowingInternals.java | 10 +++
.../beam/sdk/util/state/StateContexts.java | 56 ---------------
16 files changed, 321 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index dde883c..bcc52d3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -87,7 +87,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
stateInternals,
timerInternals,
- c.windowingInternals(),
+ WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
+ WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
droppedDueToClosedWindow,
reduceFn,
c.getPipelineOptions());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index f1a6ded..45c0eda 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -69,14 +69,15 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
- new ReduceFnRunner<K, InputT, OutputT, W>(
+ new ReduceFnRunner<>(
key,
strategy,
ExecutableTriggerStateMachine.create(
TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())),
stateInternals,
timerInternals,
- c.windowingInternals(),
+ WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
+ WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
droppedDueToClosedWindow,
reduceFn,
c.getPipelineOptions());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
new file mode 100644
index 0000000..08a0e81
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * An object that can output a value with all of its windowing information to the main output or
+ * a side output.
+ */
+public interface OutputWindowedValue<OutputT> {
+ /** Outputs a value with windowing information to the main output. */
+ void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane);
+
+ /** Outputs a value with windowing information to a side output. */
+ <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 500c6e7..668ef47 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
/**
@@ -62,20 +63,25 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private final StateInternals<K> stateInternals;
private final ActiveWindowSet<W> activeWindows;
private final TimerInternals timerInternals;
- private final WindowingInternals<?, ?> windowingInternals;
+ private final SideInputAccess sideInputAccess;
private final PipelineOptions options;
- ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
- WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
- ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
- WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
+ ReduceFnContextFactory(
+ K key,
+ ReduceFn<K, InputT, OutputT, W> reduceFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ StateInternals<K> stateInternals,
+ ActiveWindowSet<W> activeWindows,
+ TimerInternals timerInternals,
+ SideInputAccess sideInputAccess,
+ PipelineOptions options) {
this.key = key;
this.reduceFn = reduceFn;
this.windowingStrategy = windowingStrategy;
this.stateInternals = stateInternals;
this.activeWindows = activeWindows;
this.timerInternals = timerInternals;
- this.windowingInternals = windowingInternals;
+ this.sideInputAccess = sideInputAccess;
this.options = options;
}
@@ -90,7 +96,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
return new StateAccessorImpl<K, W>(
activeWindows, windowingStrategy.getWindowFn().windowCoder(),
- stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
+ stateInternals, stateContextFromComponents(options, sideInputAccess, window),
style);
}
@@ -217,7 +223,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
W mergeResult) {
super(activeWindows, windowCoder, stateInternals,
- StateContexts.windowOnly(mergeResult), style);
+ stateContextForWindowOnly(mergeResult), style);
this.activeToBeMerged = activeToBeMerged;
}
@@ -262,7 +268,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
StateInternals<K> stateInternals, W window) {
super(activeWindows, windowCoder, stateInternals,
- StateContexts.windowOnly(window), StateStyle.RENAMED);
+ stateContextForWindowOnly(window), StateStyle.RENAMED);
}
Collection<W> mergingWindows() {
@@ -496,4 +502,51 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
return timers;
}
}
+
+ private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents(
+ @Nullable final PipelineOptions options,
+ final SideInputAccess sideInputAccess,
+ final W window) {
+ if (options == null) {
+ return StateContexts.nullContext();
+ } else {
+ return new StateContext<W>() {
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return sideInputAccess.sideInput(view, window);
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+ }
+ }
+
+ /** Returns a {@link StateContext} that only contains the state window. */
+ private static <W extends BoundedWindow> StateContext<W> stateContextForWindowOnly(
+ final W window) {
+ return new StateContext<W>() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ throw new IllegalArgumentException(
+ "cannot call getPipelineOptions() in a window only context");
+ }
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new IllegalArgumentException("cannot call sideInput() in a window only context");
+ }
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 66fb27c..023a77a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.util.state.ReadableState;
@@ -217,7 +216,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
ExecutableTriggerStateMachine triggerStateMachine,
StateInternals<K> stateInternals,
TimerInternals timerInternals,
- WindowingInternals<?, KV<K, OutputT>> windowingInternals,
+ OutputWindowedValue<KV<K, OutputT>> outputter,
+ SideInputAccess sideInputAccess,
Aggregator<Long, Long> droppedDueToClosedWindow,
ReduceFn<K, InputT, OutputT, W> reduceFn,
PipelineOptions options) {
@@ -225,7 +225,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
this.timerInternals = timerInternals;
this.paneInfoTracker = new PaneInfoTracker(timerInternals);
this.stateInternals = stateInternals;
- this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
+ this.outputter = outputter;
this.droppedDueToClosedWindow = droppedDueToClosedWindow;
this.reduceFn = reduceFn;
@@ -240,8 +240,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
this.activeWindows = createActiveWindowSet();
this.contextFactory =
- new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy,
- stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
+ new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy,
+ stateInternals, this.activeWindows, timerInternals, sideInputAccess, options);
this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
this.triggerRunner =
@@ -965,33 +965,4 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
}
}
-
- /**
- * An object that can output a value with all of its windowing information. This is a deliberately
- * restricted subinterface of {@link WindowingInternals} to express how it is used here.
- */
- private interface OutputWindowedValue<OutputT> {
- void outputWindowedValue(OutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane);
- }
-
- private static class OutputViaWindowingInternals<OutputT>
- implements OutputWindowedValue<OutputT> {
-
- private final WindowingInternals<?, OutputT> windowingInternals;
-
- public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
- this.windowingInternals = windowingInternals;
- }
-
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
new file mode 100644
index 0000000..7d64566
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Allows accessing the side inputs for a particular main input window.
+ */
+public interface SideInputAccess {
+ /**
+ * Return the value of the side input for the window of a main input element.
+ */
+ <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index c046d11..c0f3a02 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -66,6 +66,10 @@ import org.joda.time.format.PeriodFormat;
/**
* Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
*
+ * <p>Also, if the {@link DoFn} observes the window of the element, then {@link SimpleDoFnRunner}
+ * explodes windows of the input {@link WindowedValue} and calls {@link DoFn.ProcessElement} for
+ * each window individually.
+ *
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
*/
@@ -627,6 +631,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
PaneInfo pane) {}
@Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {}
+
+ @Override
public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
return context.sideInput(view, mainInputWindow);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 1298fc8..8efc27b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -472,6 +472,16 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ context.sideOutputWindowedValue(tag, output, timestamp, windows, pane);
+ }
+
+ @Override
public Collection<? extends BoundedWindow> windows() {
return windowedValue.getWindows();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
new file mode 100644
index 0000000..1b47e2b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Adapters from {@link WindowingInternals} to {@link SideInputAccess} and {@link
+ * OutputWindowedValue}.
+ */
+public class WindowingInternalsAdapters {
+ static SideInputAccess sideInputAccess(final WindowingInternals<?, ?> windowingInternals) {
+ return new SideInputAccess() {
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ return windowingInternals.sideInput(view, mainInputWindow);
+ }
+ };
+ }
+
+ public static <OutputT> OutputWindowedValue<OutputT> outputWindowedValue(
+ final WindowingInternals<?, OutputT> windowingInternals) {
+ return new OutputWindowedValue<OutputT>() {
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ windowingInternals.sideOutputWindowedValue(tag, output, timestamp, windows, pane);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index f5ab8ea..5f8424e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -75,7 +74,6 @@ import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
@@ -106,7 +104,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
private final TestTimerInternals timerInternals = new TestTimerInternals();
private final WindowFn<Object, W> windowFn;
- private final TestWindowingInternals windowingInternals;
+ private final TestOutputWindowedValue testOutputter;
+ private final TestSideInputAccess testSideInputAccess;
private final Coder<OutputT> outputCoder;
private final WindowingStrategy<Object, W> objectStrategy;
private final ExecutableTriggerStateMachine executableTriggerStateMachine;
@@ -291,7 +290,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
this.objectStrategy = objectStrategy;
this.reduceFn = reduceFn;
this.windowFn = objectStrategy.getWindowFn();
- this.windowingInternals = new TestWindowingInternals(sideInputReader);
+ this.testOutputter = new TestOutputWindowedValue();
+ this.testSideInputAccess = new TestSideInputAccess(sideInputReader);
this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine);
this.outputCoder = outputCoder;
this.options = options;
@@ -313,7 +313,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
executableTriggerStateMachine,
stateInternals,
timerInternals,
- windowingInternals,
+ testOutputter,
+ testSideInputAccess,
droppedDueToClosedWindow,
reduceFn,
options);
@@ -418,7 +419,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
* How many panes do we have in the output?
*/
public int getOutputSize() {
- return windowingInternals.outputs.size();
+ return testOutputter.outputs.size();
}
/**
@@ -426,7 +427,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
*/
public List<WindowedValue<OutputT>> extractOutput() {
ImmutableList<WindowedValue<OutputT>> result =
- FluentIterable.from(windowingInternals.outputs)
+ FluentIterable.from(testOutputter.outputs)
.transform(new Function<WindowedValue<KV<String, OutputT>>, WindowedValue<OutputT>>() {
@Override
public WindowedValue<OutputT> apply(WindowedValue<KV<String, OutputT>> input) {
@@ -434,7 +435,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
}
})
.toList();
- windowingInternals.outputs.clear();
+ testOutputter.outputs.clear();
return result;
}
@@ -517,18 +518,12 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
/**
* Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
* elements.
- */
- private class TestWindowingInternals implements WindowingInternals<InputT, KV<String, OutputT>> {
+ */private class TestOutputWindowedValue implements OutputWindowedValue<KV<String, OutputT>> {
private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>();
- private SideInputReader sideInputReader;
-
- private TestWindowingInternals(SideInputReader sideInputReader) {
- this.sideInputReader = sideInputReader;
- }
@Override
public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ Collection<? extends BoundedWindow> windows, PaneInfo pane) {
// Copy the output value (using coders) before capturing it.
KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder(
KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow");
@@ -537,37 +532,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
}
@Override
- public TimerInternals timerInternals() {
- throw new UnsupportedOperationException(
- "Testing triggers should not use timers from WindowingInternals.");
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- throw new UnsupportedOperationException(
- "Testing triggers should not use windows from WindowingInternals.");
- }
-
- @Override
- public PaneInfo pane() {
- throw new UnsupportedOperationException(
- "Testing triggers should not use pane from WindowingInternals.");
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ throw new UnsupportedOperationException();
}
+ }
- @Override
- public <T> void writePCollectionViewData(
- TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
- throw new UnsupportedOperationException(
- "Testing triggers should not use writePCollectionViewData from WindowingInternals.");
- }
+ private class TestSideInputAccess implements SideInputAccess {
+ private SideInputReader sideInputReader;
- @Override
- public StateInternals<Object> stateInternals() {
- // Safe for testing only
- @SuppressWarnings({"unchecked", "rawtypes"})
- TestInMemoryStateInternals<Object> untypedStateInternals =
- (TestInMemoryStateInternals) stateInternals;
- return untypedStateInternals;
+ private TestSideInputAccess(SideInputReader sideInputReader) {
+ this.sideInputReader = sideInputReader;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index e5c5e4b..0e8adba 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -29,7 +29,9 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.SideInputAccess;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -173,7 +175,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
stateInternals,
timerInternals,
- new DirectWindowingInternals<>(bundle),
+ new OutputWindowedValueToBundle<>(bundle),
+ new SideInputAccess() {
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ throw new UnsupportedOperationException(
+ "GroupAlsoByWindow must not have side inputs");
+ }
+ },
droppedDueToClosedWindow,
reduceFn,
evaluationContext.getPipelineOptions());
@@ -243,26 +252,15 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
}
}
- private static class DirectWindowingInternals<K, V>
- implements WindowingInternals<Object, KV<K, Iterable<V>>> {
+ private static class OutputWindowedValueToBundle<K, V>
+ implements OutputWindowedValue<KV<K, Iterable<V>>> {
private final UncommittedBundle<KV<K, Iterable<V>>> bundle;
- private DirectWindowingInternals(
- UncommittedBundle<KV<K, Iterable<V>>> bundle) {
+ private OutputWindowedValueToBundle(UncommittedBundle<KV<K, Iterable<V>>> bundle) {
this.bundle = bundle;
}
@Override
- public StateInternals<?> stateInternals() {
- throw new UnsupportedOperationException(
- String.format(
- "%s should use the %s it is provided rather than the contents of %s",
- ReduceFnRunner.class.getSimpleName(),
- StateInternals.class.getSimpleName(),
- getClass().getSimpleName()));
- }
-
- @Override
public void outputWindowedValue(
KV<K, Iterable<V>> output,
Instant timestamp,
@@ -272,44 +270,13 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TimerInternals timerInternals() {
- throw new UnsupportedOperationException(
- String.format(
- "%s should use the %s it is provided rather than the contents of %s",
- ReduceFnRunner.class.getSimpleName(),
- TimerInternals.class.getSimpleName(),
- getClass().getSimpleName()));
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- throw new IllegalArgumentException(
- String.format(
- "%s should not access Windows via %s.windows(); "
- + "it should instead inspect the window of the input elements",
- GroupAlsoByWindowEvaluator.class.getSimpleName(),
- WindowingInternals.class.getSimpleName()));
- }
-
- @Override
- public PaneInfo pane() {
- throw new IllegalArgumentException(
- String.format(
- "%s should not access Windows via %s.windows(); "
- + "it should instead inspect the window of the input elements",
- GroupAlsoByWindowEvaluator.class.getSimpleName(),
- WindowingInternals.class.getSimpleName()));
- }
-
- @Override
- public <T> void writePCollectionViewData(
- TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- throw new UnsupportedOperationException();
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ throw new UnsupportedOperationException("Can't output to side outputs from a ReduceFn");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index baf97cb..1b28a70 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -166,11 +166,24 @@ class FlinkProcessContext<InputT, OutputT>
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
+ // TODO: Refactor this (get rid of duplication, move things around w.r.t.
+ // FlinkMultiOutputProcessContext)
collector.collect(WindowedValue.of(value, timestamp, windows, pane));
outputWithTimestampAndWindow(value, timestamp, windows, pane);
}
@Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ // TODO: Implement this
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public TimerInternals timerInternals() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 99cd522..f3152ba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -246,6 +246,13 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
}
@Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp,
+ Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public StateInternals stateInternals() {
//TODO: implement state internals.
// This is a temporary placeholder to get the TfIdfTest
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 7995719..dd7d894 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -278,7 +278,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
startBundle();
}
try {
- fn.processElement(createProcessContext(fn, element));
+ fn.processElement(createProcessContext(element));
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
@@ -606,9 +606,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
}
- private TestProcessContext createProcessContext(
- OldDoFn<InputT, OutputT> fn,
- TimestampedValue<InputT> elem) {
+ private TestProcessContext createProcessContext(TimestampedValue<InputT> elem) {
WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow(
elem.getValue(), elem.getTimestamp());
@@ -678,6 +676,16 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ context.noteOutput(tag, WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
public TimerInternals timerInternals() {
return timerInternals;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
index 016276c..ab3c600 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
@@ -52,6 +52,16 @@ public interface WindowingInternals<InputT, OutputT> {
Collection<? extends BoundedWindow> windows, PaneInfo pane);
/**
+ * Output the value to a side output at the specified timestamp in the listed windows.
+ */
+ <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane);
+
+ /**
* Return the timer manager provided by the underlying system, or null if Timers need
* to be emulated.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
index d0c566d..81121e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
@@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.util.state;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.values.PCollectionView;
/**
@@ -51,58 +49,4 @@ public class StateContexts {
public static <W extends BoundedWindow> StateContext<W> nullContext() {
return (StateContext<W>) NULL_CONTEXT;
}
-
- /**
- * Returns a {@link StateContext} that only contains the state window.
- */
- public static <W extends BoundedWindow> StateContext<W> windowOnly(final W window) {
- return new StateContext<W>() {
- @Override
- public PipelineOptions getPipelineOptions() {
- throw new IllegalArgumentException(
- "cannot call getPipelineOptions() in a window only context");
- }
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- throw new IllegalArgumentException("cannot call sideInput() in a window only context");
- }
- @Override
- public W window() {
- return window;
- }
- };
- }
-
- /**
- * Returns a {@link StateContext} from {@code PipelineOptions}, {@link WindowingInternals},
- * and the state window.
- */
- public static <W extends BoundedWindow> StateContext<W> createFromComponents(
- @Nullable final PipelineOptions options,
- final WindowingInternals<?, ?> windowingInternals,
- final W window) {
- @SuppressWarnings("unchecked")
- StateContext<W> typedNullContext = (StateContext<W>) NULL_CONTEXT;
- if (options == null) {
- return typedNullContext;
- } else {
- return new StateContext<W>() {
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return windowingInternals.sideInput(view, window);
- }
-
- @Override
- public W window() {
- return window;
- }
- };
- }
- }
}