You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:20 UTC

[16/50] incubator-beam git commit: Liberates ReduceFnRunner from WindowingInternals

Liberates ReduceFnRunner from WindowingInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24cae56a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24cae56a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24cae56a

Branch: refs/heads/python-sdk
Commit: 24cae56ad3a7b25b9e2114907f1d069a243f87dd
Parents: 1543ea9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Nov 10 18:40:53 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  3 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  5 +-
 .../beam/runners/core/OutputWindowedValue.java  | 46 ++++++++++++
 .../runners/core/ReduceFnContextFactory.java    | 71 ++++++++++++++++---
 .../beam/runners/core/ReduceFnRunner.java       | 39 ++---------
 .../beam/runners/core/SideInputAccess.java      | 31 +++++++++
 .../beam/runners/core/SimpleDoFnRunner.java     | 12 ++++
 .../beam/runners/core/SimpleOldDoFnRunner.java  | 10 +++
 .../core/WindowingInternalsAdapters.java        | 65 +++++++++++++++++
 .../beam/runners/core/ReduceFnTester.java       | 67 ++++++------------
 .../GroupAlsoByWindowEvaluatorFactory.java      | 73 ++++++--------------
 .../functions/FlinkProcessContext.java          | 13 ++++
 .../spark/translation/SparkProcessContext.java  |  7 ++
 .../apache/beam/sdk/transforms/DoFnTester.java  | 16 +++--
 .../beam/sdk/util/WindowingInternals.java       | 10 +++
 .../beam/sdk/util/state/StateContexts.java      | 56 ---------------
 16 files changed, 321 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index dde883c..bcc52d3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -87,7 +87,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
                 TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
             stateInternals,
             timerInternals,
-            c.windowingInternals(),
+            WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
+            WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
             droppedDueToClosedWindow,
             reduceFn,
             c.getPipelineOptions());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index f1a6ded..45c0eda 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -69,14 +69,15 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
     StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
-        new ReduceFnRunner<K, InputT, OutputT, W>(
+        new ReduceFnRunner<>(
             key,
             strategy,
             ExecutableTriggerStateMachine.create(
                 TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())),
             stateInternals,
             timerInternals,
-            c.windowingInternals(),
+            WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
+            WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
             droppedDueToClosedWindow,
             reduceFn,
             c.getPipelineOptions());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
new file mode 100644
index 0000000..08a0e81
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * An object that can output a value with all of its windowing information to the main output or
+ * a side output.
+ */
+public interface OutputWindowedValue<OutputT> {
+  /** Outputs a value with windowing information to the main output. */
+  void outputWindowedValue(
+      OutputT output,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane);
+
+  /** Outputs a value with windowing information to a side output. */
+  <SideOutputT> void sideOutputWindowedValue(
+      TupleTag<SideOutputT> tag,
+      SideOutputT output,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 500c6e7..668ef47 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
 import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
 
 /**
@@ -62,20 +63,25 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
   private final StateInternals<K> stateInternals;
   private final ActiveWindowSet<W> activeWindows;
   private final TimerInternals timerInternals;
-  private final WindowingInternals<?, ?> windowingInternals;
+  private final SideInputAccess sideInputAccess;
   private final PipelineOptions options;
 
-  ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
-      WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
-      ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
-      WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
+  ReduceFnContextFactory(
+      K key,
+      ReduceFn<K, InputT, OutputT, W> reduceFn,
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternals<K> stateInternals,
+      ActiveWindowSet<W> activeWindows,
+      TimerInternals timerInternals,
+      SideInputAccess sideInputAccess,
+      PipelineOptions options) {
     this.key = key;
     this.reduceFn = reduceFn;
     this.windowingStrategy = windowingStrategy;
     this.stateInternals = stateInternals;
     this.activeWindows = activeWindows;
     this.timerInternals = timerInternals;
-    this.windowingInternals = windowingInternals;
+    this.sideInputAccess = sideInputAccess;
     this.options = options;
   }
 
@@ -90,7 +96,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
   private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
     return new StateAccessorImpl<K, W>(
         activeWindows, windowingStrategy.getWindowFn().windowCoder(),
-        stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
+        stateInternals, stateContextFromComponents(options, sideInputAccess, window),
         style);
   }
 
@@ -217,7 +223,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
         StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
         W mergeResult) {
       super(activeWindows, windowCoder, stateInternals,
-          StateContexts.windowOnly(mergeResult), style);
+          stateContextForWindowOnly(mergeResult), style);
       this.activeToBeMerged = activeToBeMerged;
     }
 
@@ -262,7 +268,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
     public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
         StateInternals<K> stateInternals, W window) {
       super(activeWindows, windowCoder, stateInternals,
-          StateContexts.windowOnly(window), StateStyle.RENAMED);
+          stateContextForWindowOnly(window), StateStyle.RENAMED);
     }
 
     Collection<W> mergingWindows() {
@@ -496,4 +502,51 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
       return timers;
     }
   }
+
+  private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents(
+      @Nullable final PipelineOptions options,
+      final SideInputAccess sideInputAccess,
+      final W window) {
+    if (options == null) {
+      return StateContexts.nullContext();
+    } else {
+      return new StateContext<W>() {
+
+        @Override
+        public PipelineOptions getPipelineOptions() {
+          return options;
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view) {
+          return sideInputAccess.sideInput(view, window);
+        }
+
+        @Override
+        public W window() {
+          return window;
+        }
+      };
+    }
+  }
+
+  /** Returns a {@link StateContext} that only contains the state window. */
+  private static <W extends BoundedWindow> StateContext<W> stateContextForWindowOnly(
+      final W window) {
+    return new StateContext<W>() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        throw new IllegalArgumentException(
+            "cannot call getPipelineOptions() in a window only context");
+      }
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        throw new IllegalArgumentException("cannot call sideInput() in a window only context");
+      }
+      @Override
+      public W window() {
+        return window;
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 66fb27c..023a77a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.util.state.ReadableState;
@@ -217,7 +216,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
       ExecutableTriggerStateMachine triggerStateMachine,
       StateInternals<K> stateInternals,
       TimerInternals timerInternals,
-      WindowingInternals<?, KV<K, OutputT>> windowingInternals,
+      OutputWindowedValue<KV<K, OutputT>> outputter,
+      SideInputAccess sideInputAccess,
       Aggregator<Long, Long> droppedDueToClosedWindow,
       ReduceFn<K, InputT, OutputT, W> reduceFn,
       PipelineOptions options) {
@@ -225,7 +225,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     this.timerInternals = timerInternals;
     this.paneInfoTracker = new PaneInfoTracker(timerInternals);
     this.stateInternals = stateInternals;
-    this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
+    this.outputter = outputter;
     this.droppedDueToClosedWindow = droppedDueToClosedWindow;
     this.reduceFn = reduceFn;
 
@@ -240,8 +240,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     this.activeWindows = createActiveWindowSet();
 
     this.contextFactory =
-        new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy,
-            stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
+        new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy,
+            stateInternals, this.activeWindows, timerInternals, sideInputAccess, options);
 
     this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
     this.triggerRunner =
@@ -965,33 +965,4 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
       return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     }
   }
-
-  /**
-   * An object that can output a value with all of its windowing information. This is a deliberately
-   * restricted subinterface of {@link WindowingInternals} to express how it is used here.
-   */
-  private interface OutputWindowedValue<OutputT> {
-    void outputWindowedValue(OutputT output, Instant timestamp,
-        Collection<? extends BoundedWindow> windows, PaneInfo pane);
-  }
-
-  private static class OutputViaWindowingInternals<OutputT>
-      implements OutputWindowedValue<OutputT> {
-
-    private final WindowingInternals<?, OutputT> windowingInternals;
-
-    public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
-      this.windowingInternals = windowingInternals;
-    }
-
-    @Override
-    public void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
new file mode 100644
index 0000000..7d64566
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Allows accessing the side inputs for a particular main input window.
+ */
+public interface SideInputAccess {
+  /**
+   * Return the value of the side input for the window of a main input element.
+   */
+  <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index c046d11..c0f3a02 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -66,6 +66,10 @@ import org.joda.time.format.PeriodFormat;
 /**
  * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
  *
+ * <p>Also, if the {@link DoFn} observes the window of the element, then {@link SimpleDoFnRunner}
+ * explodes windows of the input {@link WindowedValue} and calls {@link DoFn.ProcessElement} for
+ * each window individually.
+ *
  * @param <InputT> the type of the {@link DoFn} (main) input elements
  * @param <OutputT> the type of the {@link DoFn} (main) output elements
  */
@@ -627,6 +631,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
             PaneInfo pane) {}
 
         @Override
+        public <SideOutputT> void sideOutputWindowedValue(
+            TupleTag<SideOutputT> tag,
+            SideOutputT output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {}
+
+        @Override
         public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
           return context.sideInput(view, mainInputWindow);
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 1298fc8..8efc27b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -472,6 +472,16 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
         }
 
         @Override
+        public <SideOutputT> void sideOutputWindowedValue(
+            TupleTag<SideOutputT> tag,
+            SideOutputT output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {
+          context.sideOutputWindowedValue(tag, output, timestamp, windows, pane);
+        }
+
+        @Override
         public Collection<? extends BoundedWindow> windows() {
           return windowedValue.getWindows();
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
new file mode 100644
index 0000000..1b47e2b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Adapters from {@link WindowingInternals} to {@link SideInputAccess} and {@link
+ * OutputWindowedValue}.
+ */
+public class WindowingInternalsAdapters {
+  static SideInputAccess sideInputAccess(final WindowingInternals<?, ?> windowingInternals) {
+    return new SideInputAccess() {
+      @Override
+      public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+        return windowingInternals.sideInput(view, mainInputWindow);
+      }
+    };
+  }
+
+  public static <OutputT> OutputWindowedValue<OutputT> outputWindowedValue(
+      final WindowingInternals<?, OutputT> windowingInternals) {
+    return new OutputWindowedValue<OutputT>() {
+      @Override
+      public void outputWindowedValue(
+          OutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
+      }
+
+      @Override
+      public <SideOutputT> void sideOutputWindowedValue(
+          TupleTag<SideOutputT> tag,
+          SideOutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        windowingInternals.sideOutputWindowedValue(tag, output, timestamp, windows, pane);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index f5ab8ea..5f8424e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -75,7 +74,6 @@ import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
@@ -106,7 +104,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   private final TestTimerInternals timerInternals = new TestTimerInternals();
 
   private final WindowFn<Object, W> windowFn;
-  private final TestWindowingInternals windowingInternals;
+  private final TestOutputWindowedValue testOutputter;
+  private final TestSideInputAccess testSideInputAccess;
   private final Coder<OutputT> outputCoder;
   private final WindowingStrategy<Object, W> objectStrategy;
   private final ExecutableTriggerStateMachine executableTriggerStateMachine;
@@ -291,7 +290,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     this.objectStrategy = objectStrategy;
     this.reduceFn = reduceFn;
     this.windowFn = objectStrategy.getWindowFn();
-    this.windowingInternals = new TestWindowingInternals(sideInputReader);
+    this.testOutputter = new TestOutputWindowedValue();
+    this.testSideInputAccess = new TestSideInputAccess(sideInputReader);
     this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine);
     this.outputCoder = outputCoder;
     this.options = options;
@@ -313,7 +313,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         executableTriggerStateMachine,
         stateInternals,
         timerInternals,
-        windowingInternals,
+        testOutputter,
+        testSideInputAccess,
         droppedDueToClosedWindow,
         reduceFn,
         options);
@@ -418,7 +419,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    * How many panes do we have in the output?
    */
   public int getOutputSize() {
-    return windowingInternals.outputs.size();
+    return testOutputter.outputs.size();
   }
 
   /**
@@ -426,7 +427,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    */
   public List<WindowedValue<OutputT>> extractOutput() {
     ImmutableList<WindowedValue<OutputT>> result =
-        FluentIterable.from(windowingInternals.outputs)
+        FluentIterable.from(testOutputter.outputs)
             .transform(new Function<WindowedValue<KV<String, OutputT>>, WindowedValue<OutputT>>() {
               @Override
               public WindowedValue<OutputT> apply(WindowedValue<KV<String, OutputT>> input) {
@@ -434,7 +435,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
               }
             })
             .toList();
-    windowingInternals.outputs.clear();
+    testOutputter.outputs.clear();
     return result;
   }
 
@@ -517,18 +518,12 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   /**
    * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
    * elements.
-   */
-  private class TestWindowingInternals implements WindowingInternals<InputT, KV<String, OutputT>> {
+   */private class TestOutputWindowedValue implements OutputWindowedValue<KV<String, OutputT>> {
     private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>();
-    private SideInputReader sideInputReader;
-
-    private TestWindowingInternals(SideInputReader sideInputReader) {
-      this.sideInputReader = sideInputReader;
-    }
 
     @Override
     public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp,
-        Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+                                    Collection<? extends BoundedWindow> windows, PaneInfo pane) {
       // Copy the output value (using coders) before capturing it.
       KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder(
           KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow");
@@ -537,37 +532,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     }
 
     @Override
-    public TimerInternals timerInternals() {
-      throw new UnsupportedOperationException(
-          "Testing triggers should not use timers from WindowingInternals.");
-    }
-
-    @Override
-    public Collection<? extends BoundedWindow> windows() {
-      throw new UnsupportedOperationException(
-          "Testing triggers should not use windows from WindowingInternals.");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new UnsupportedOperationException(
-          "Testing triggers should not use pane from WindowingInternals.");
+    public <SideOutputT> void sideOutputWindowedValue(
+        TupleTag<SideOutputT> tag,
+        SideOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      throw new UnsupportedOperationException();
     }
+  }
 
-    @Override
-    public <T> void writePCollectionViewData(
-        TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-      throw new UnsupportedOperationException(
-          "Testing triggers should not use writePCollectionViewData from WindowingInternals.");
-    }
+  private class TestSideInputAccess implements SideInputAccess {
+    private SideInputReader sideInputReader;
 
-    @Override
-    public StateInternals<Object> stateInternals() {
-      // Safe for testing only
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      TestInMemoryStateInternals<Object> untypedStateInternals =
-          (TestInMemoryStateInternals) stateInternals;
-      return untypedStateInternals;
+    private TestSideInputAccess(SideInputReader sideInputReader) {
+      this.sideInputReader = sideInputReader;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index e5c5e4b..0e8adba 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -29,7 +29,9 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.SideInputAccess;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -173,7 +175,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
                   TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
               stateInternals,
               timerInternals,
-              new DirectWindowingInternals<>(bundle),
+              new OutputWindowedValueToBundle<>(bundle),
+              new SideInputAccess() {
+                @Override
+                public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+                  throw new UnsupportedOperationException(
+                      "GroupAlsoByWindow must not have side inputs");
+                }
+              },
               droppedDueToClosedWindow,
               reduceFn,
               evaluationContext.getPipelineOptions());
@@ -243,26 +252,15 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
   }
 
-  private static class DirectWindowingInternals<K, V>
-      implements WindowingInternals<Object, KV<K, Iterable<V>>> {
+  private static class OutputWindowedValueToBundle<K, V>
+      implements OutputWindowedValue<KV<K, Iterable<V>>> {
     private final UncommittedBundle<KV<K, Iterable<V>>> bundle;
 
-    private DirectWindowingInternals(
-        UncommittedBundle<KV<K, Iterable<V>>> bundle) {
+    private OutputWindowedValueToBundle(UncommittedBundle<KV<K, Iterable<V>>> bundle) {
       this.bundle = bundle;
     }
 
     @Override
-    public StateInternals<?> stateInternals() {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should use the %s it is provided rather than the contents of %s",
-              ReduceFnRunner.class.getSimpleName(),
-              StateInternals.class.getSimpleName(),
-              getClass().getSimpleName()));
-    }
-
-    @Override
     public void outputWindowedValue(
         KV<K, Iterable<V>> output,
         Instant timestamp,
@@ -272,44 +270,13 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TimerInternals timerInternals() {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should use the %s it is provided rather than the contents of %s",
-              ReduceFnRunner.class.getSimpleName(),
-              TimerInternals.class.getSimpleName(),
-              getClass().getSimpleName()));
-    }
-
-    @Override
-    public Collection<? extends BoundedWindow> windows() {
-      throw new IllegalArgumentException(
-          String.format(
-              "%s should not access Windows via %s.windows(); "
-                  + "it should instead inspect the window of the input elements",
-              GroupAlsoByWindowEvaluator.class.getSimpleName(),
-              WindowingInternals.class.getSimpleName()));
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new IllegalArgumentException(
-          String.format(
-              "%s should not access Windows via %s.windows(); "
-                  + "it should instead inspect the window of the input elements",
-              GroupAlsoByWindowEvaluator.class.getSimpleName(),
-              WindowingInternals.class.getSimpleName()));
-    }
-
-    @Override
-    public <T> void writePCollectionViewData(
-        TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-      throw new UnsupportedOperationException();
+    public <SideOutputT> void sideOutputWindowedValue(
+        TupleTag<SideOutputT> tag,
+        SideOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      throw new UnsupportedOperationException("Can't output to side outputs from a ReduceFn");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index baf97cb..1b28a70 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -166,11 +166,24 @@ class FlinkProcessContext<InputT, OutputT>
           Instant timestamp,
           Collection<? extends BoundedWindow> windows,
           PaneInfo pane) {
+        // TODO: Refactor this (get rid of duplication, move things around w.r.t.
+        // FlinkMultiOutputProcessContext)
         collector.collect(WindowedValue.of(value, timestamp, windows, pane));
         outputWithTimestampAndWindow(value, timestamp, windows, pane);
       }
 
       @Override
+      public <SideOutputT> void sideOutputWindowedValue(
+          TupleTag<SideOutputT> tag,
+          SideOutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        // TODO: Implement this
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
       public TimerInternals timerInternals() {
         throw new UnsupportedOperationException();
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 99cd522..f3152ba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -246,6 +246,13 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
       }
 
       @Override
+      public <SideOutputT> void sideOutputWindowedValue(
+          TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp,
+          Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
       public StateInternals stateInternals() {
         //TODO: implement state internals.
         // This is a temporary placeholder to get the TfIdfTest

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 7995719..dd7d894 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -278,7 +278,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       startBundle();
     }
     try {
-      fn.processElement(createProcessContext(fn, element));
+      fn.processElement(createProcessContext(element));
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
@@ -606,9 +606,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
   }
 
-  private TestProcessContext createProcessContext(
-      OldDoFn<InputT, OutputT> fn,
-      TimestampedValue<InputT> elem) {
+  private TestProcessContext createProcessContext(TimestampedValue<InputT> elem) {
     WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow(
         elem.getValue(), elem.getTimestamp());
 
@@ -678,6 +676,16 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
         }
 
         @Override
+        public <SideOutputT> void sideOutputWindowedValue(
+            TupleTag<SideOutputT> tag,
+            SideOutputT output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {
+          context.noteOutput(tag, WindowedValue.of(output, timestamp, windows, pane));
+        }
+
+        @Override
         public TimerInternals timerInternals() {
           return timerInternals;
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
index 016276c..ab3c600 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
@@ -52,6 +52,16 @@ public interface WindowingInternals<InputT, OutputT> {
       Collection<? extends BoundedWindow> windows, PaneInfo pane);
 
   /**
+   * Output the value to a side output at the specified timestamp in the listed windows.
+   */
+  <SideOutputT> void sideOutputWindowedValue(
+      TupleTag<SideOutputT> tag,
+      SideOutputT output,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane);
+
+  /**
    * Return the timer manager provided by the underlying system, or null if Timers need
    * to be emulated.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
index d0c566d..81121e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.sdk.util.state;
 
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -51,58 +49,4 @@ public class StateContexts {
   public static <W extends BoundedWindow> StateContext<W> nullContext() {
     return (StateContext<W>) NULL_CONTEXT;
   }
-
-  /**
-   * Returns a {@link StateContext} that only contains the state window.
-   */
-  public static <W extends BoundedWindow> StateContext<W> windowOnly(final W window) {
-    return new StateContext<W>() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        throw new IllegalArgumentException(
-            "cannot call getPipelineOptions() in a window only context");
-      }
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        throw new IllegalArgumentException("cannot call sideInput() in a window only context");
-      }
-      @Override
-      public W window() {
-        return window;
-      }
-    };
-  }
-
-  /**
-   * Returns a {@link StateContext} from {@code PipelineOptions}, {@link WindowingInternals},
-   * and the state window.
-   */
-  public static <W extends BoundedWindow> StateContext<W> createFromComponents(
-      @Nullable final PipelineOptions options,
-      final WindowingInternals<?, ?> windowingInternals,
-      final W window) {
-    @SuppressWarnings("unchecked")
-    StateContext<W> typedNullContext = (StateContext<W>) NULL_CONTEXT;
-    if (options == null) {
-      return typedNullContext;
-    } else {
-      return new StateContext<W>() {
-
-        @Override
-        public PipelineOptions getPipelineOptions() {
-          return options;
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view) {
-          return windowingInternals.sideInput(view, window);
-        }
-
-        @Override
-        public W window() {
-          return window;
-        }
-      };
-    }
-  }
 }