You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/23 06:51:58 UTC

[02/50] incubator-beam git commit: Replaces SideInputAccess with SideInputReader

Replaces SideInputAccess with SideInputReader

Makes WindowingInternals.sideInput take the side input window
instead of main input window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a0d0e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a0d0e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a0d0e1

Branch: refs/heads/gearpump-runner
Commit: 90a0d0e13fa0332df805b79b1dc64860d9590217
Parents: 8243fcd
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Nov 14 14:48:31 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 26 ++++++++++---
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  2 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  2 +-
 .../runners/core/ReduceFnContextFactory.java    | 27 +++++++++-----
 .../beam/runners/core/ReduceFnRunner.java       |  5 ++-
 .../beam/runners/core/SideInputAccess.java      | 31 ----------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 32 +++++++---------
 .../beam/runners/core/SimpleOldDoFnRunner.java  | 11 +++---
 .../core/WindowingInternalsAdapters.java        | 21 ++++++++---
 .../beam/runners/core/ReduceFnTester.java       | 34 +++++------------
 .../GroupAlsoByWindowEvaluatorFactory.java      | 21 +++++++----
 .../functions/FlinkDoFnFunction.java            | 15 ++++----
 .../functions/FlinkProcessContextBase.java      |  9 +----
 .../FlinkSingleOutputProcessContext.java        |  1 -
 .../runners/spark/translation/DoFnFunction.java |  5 +--
 .../spark/translation/MultiDoFnFunction.java    |  6 +--
 .../spark/translation/SparkProcessContext.java  | 39 ++++++++++++--------
 .../apache/beam/sdk/transforms/DoFnTester.java  |  2 +-
 .../beam/sdk/util/WindowingInternals.java       |  4 +-
 19 files changed, 141 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 8fbfb03..eca4308 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -354,13 +354,26 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         }
 
         @Override
-        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
-            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+        public void outputWindowedValue(
+            KV<K, Iterable<V>> output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {
           if (traceTuples) {
             LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
           }
-          ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
-              WindowedValue.of(output, timestamp, windows, pane)));
+          ApexGroupByKeyOperator.this.output.emit(
+              ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+        }
+
+        @Override
+        public <SideOutputT> void sideOutputWindowedValue(
+            TupleTag<SideOutputT> tag,
+            SideOutputT output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {
+          throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
         }
 
         @Override
@@ -379,8 +392,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         }
 
         @Override
-        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
-            Coder<T> elemCoder) throws IOException {
+        public <T> void writePCollectionViewData(
+            TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder)
+            throws IOException {
           throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index bcc52d3..8b10813 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -88,7 +88,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
             stateInternals,
             timerInternals,
             WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
-            WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
+            WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
             droppedDueToClosedWindow,
             reduceFn,
             c.getPipelineOptions());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 45c0eda..f8f6207 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -77,7 +77,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
             stateInternals,
             timerInternals,
             WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
-            WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()),
+            WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
             droppedDueToClosedWindow,
             reduceFn,
             c.getPipelineOptions());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index d43fb8e..539126a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -28,7 +28,9 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
@@ -62,7 +64,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
   private final StateInternals<K> stateInternals;
   private final ActiveWindowSet<W> activeWindows;
   private final TimerInternals timerInternals;
-  private final SideInputAccess sideInputAccess;
+  private final SideInputReader sideInputReader;
   private final PipelineOptions options;
 
   ReduceFnContextFactory(
@@ -72,7 +74,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
       StateInternals<K> stateInternals,
       ActiveWindowSet<W> activeWindows,
       TimerInternals timerInternals,
-      SideInputAccess sideInputAccess,
+      SideInputReader sideInputReader,
       PipelineOptions options) {
     this.key = key;
     this.reduceFn = reduceFn;
@@ -80,7 +82,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
     this.stateInternals = stateInternals;
     this.activeWindows = activeWindows;
     this.timerInternals = timerInternals;
-    this.sideInputAccess = sideInputAccess;
+    this.sideInputReader = sideInputReader;
     this.options = options;
   }
 
@@ -94,8 +96,14 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
   private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
     return new StateAccessorImpl<K, W>(
-        activeWindows, windowingStrategy.getWindowFn().windowCoder(),
-        stateInternals, stateContextFromComponents(options, sideInputAccess, window),
+        activeWindows,
+        windowingStrategy.getWindowFn().windowCoder(),
+        stateInternals,
+        stateContextFromComponents(
+            options,
+            sideInputReader,
+            window,
+            windowingStrategy.getWindowFn()),
         style);
   }
 
@@ -504,8 +512,9 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
   private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents(
       @Nullable final PipelineOptions options,
-      final SideInputAccess sideInputAccess,
-      final W window) {
+      final SideInputReader sideInputReader,
+      final W mainInputWindow,
+      final WindowFn<?, W> windowFn) {
     if (options == null) {
       return StateContexts.nullContext();
     } else {
@@ -518,12 +527,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
         @Override
         public <T> T sideInput(PCollectionView<T> view) {
-          return sideInputAccess.sideInput(view, window);
+          return sideInputReader.get(view, windowFn.getSideInputWindow(mainInputWindow));
         }
 
         @Override
         public W window() {
-          return window;
+          return mainInputWindow;
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 023a77a..a686f46 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ActiveWindowSet;
 import org.apache.beam.sdk.util.MergingActiveWindowSet;
 import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
+import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
@@ -217,7 +218,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
       StateInternals<K> stateInternals,
       TimerInternals timerInternals,
       OutputWindowedValue<KV<K, OutputT>> outputter,
-      SideInputAccess sideInputAccess,
+      SideInputReader sideInputReader,
       Aggregator<Long, Long> droppedDueToClosedWindow,
       ReduceFn<K, InputT, OutputT, W> reduceFn,
       PipelineOptions options) {
@@ -241,7 +242,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
 
     this.contextFactory =
         new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy,
-            stateInternals, this.activeWindows, timerInternals, sideInputAccess, options);
+            stateInternals, this.activeWindows, timerInternals, sideInputReader, options);
 
     this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
     this.triggerRunner =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
deleted file mode 100644
index 7d64566..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Allows accessing the side inputs for a particular main input window.
- */
-public interface SideInputAccess {
-  /**
-   * Return the value of the side input for the window of a main input element.
-   */
-  <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index c0f3a02..76aae8f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -142,7 +142,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
   }
 
   private void invokeProcessElement(WindowedValue<InputT> elem) {
-    DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem);
+    final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem);
 
     // This can contain user code. Wrap it in case it throws an exception.
     try {
@@ -283,12 +283,10 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       return WindowedValue.of(output, timestamp, windows, pane);
     }
 
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
       if (!sideInputReader.contains(view)) {
         throw new IllegalArgumentException("calling sideInput() with unknown view");
       }
-      BoundedWindow sideInputWindow =
-          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
       return sideInputReader.get(view, sideInputWindow);
     }
 
@@ -432,7 +430,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     final DoFnContext<InputT, OutputT> context;
     final WindowedValue<InputT> windowedValue;
 
-    public DoFnProcessContext(
+    private DoFnProcessContext(
         DoFn<InputT, OutputT> fn,
         DoFnContext<InputT, OutputT> context,
         WindowedValue<InputT> windowedValue) {
@@ -473,7 +471,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
               "sideInput called when main input element is in multiple windows");
         }
       }
-      return context.sideInput(view, window);
+      return context.sideInput(
+          view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
     }
 
     @Override
@@ -493,14 +492,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
           output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
     }
 
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      context.outputWindowedValue(output, timestamp, windows, pane);
-    }
-
     @Override
     public <T> void sideOutput(TupleTag<T> tag, T output) {
       checkNotNull(tag, "Tag passed to sideOutput cannot be null");
@@ -628,7 +619,9 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
             OutputT output,
             Instant timestamp,
             Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {}
+            PaneInfo pane) {
+          throw new UnsupportedOperationException("A DoFn cannot output to a different window");
+        }
 
         @Override
         public <SideOutputT> void sideOutputWindowedValue(
@@ -636,11 +629,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
             SideOutputT output,
             Instant timestamp,
             Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {}
+            PaneInfo pane) {
+          throw new UnsupportedOperationException(
+              "A DoFn cannot side output to a different window");
+        }
 
         @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          return context.sideInput(view, mainInputWindow);
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+          return context.sideInput(view, sideInputWindow);
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 8efc27b..cbda791 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -251,12 +251,10 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
       return WindowedValue.of(output, timestamp, windows, pane);
     }
 
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
       if (!sideInputReader.contains(view)) {
         throw new IllegalArgumentException("calling sideInput() with unknown view");
       }
-      BoundedWindow sideInputWindow =
-          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
       return sideInputReader.get(view, sideInputWindow);
     }
 
@@ -390,7 +388,8 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
               "sideInput called when main input element is in multiple windows");
         }
       }
-      return context.sideInput(view, window);
+      return context.sideInput(
+          view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
     }
 
     @Override
@@ -515,8 +514,8 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
         }
 
         @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          return context.sideInput(view, mainInputWindow);
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+          return context.sideInput(view, sideInputWindow);
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
index 1b47e2b..7f80844 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
@@ -20,21 +20,32 @@ package org.apache.beam.runners.core;
 import java.util.Collection;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
- * Adapters from {@link WindowingInternals} to {@link SideInputAccess} and {@link
+ * Adapters from {@link WindowingInternals} to {@link SideInputReader} and {@link
  * OutputWindowedValue}.
  */
 public class WindowingInternalsAdapters {
-  static SideInputAccess sideInputAccess(final WindowingInternals<?, ?> windowingInternals) {
-    return new SideInputAccess() {
+  static SideInputReader sideInputReader(final WindowingInternals<?, ?> windowingInternals) {
+    return new SideInputReader() {
       @Override
-      public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-        return windowingInternals.sideInput(view, mainInputWindow);
+      public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+        return windowingInternals.sideInput(view, sideInputWindow);
+      }
+
+      @Override
+      public <T> boolean contains(PCollectionView<T> view) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public boolean isEmpty() {
+        throw new UnsupportedOperationException();
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 5f8424e..337be23 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -80,7 +80,6 @@ import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
 import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
@@ -105,7 +104,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   private final WindowFn<Object, W> windowFn;
   private final TestOutputWindowedValue testOutputter;
-  private final TestSideInputAccess testSideInputAccess;
+  private final SideInputReader sideInputReader;
   private final Coder<OutputT> outputCoder;
   private final WindowingStrategy<Object, W> objectStrategy;
   private final ExecutableTriggerStateMachine executableTriggerStateMachine;
@@ -291,7 +290,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     this.reduceFn = reduceFn;
     this.windowFn = objectStrategy.getWindowFn();
     this.testOutputter = new TestOutputWindowedValue();
-    this.testSideInputAccess = new TestSideInputAccess(sideInputReader);
+    this.sideInputReader = sideInputReader;
     this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine);
     this.outputCoder = outputCoder;
     this.options = options;
@@ -314,7 +313,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         stateInternals,
         timerInternals,
         testOutputter,
-        testSideInputAccess,
+        sideInputReader,
         droppedDueToClosedWindow,
         reduceFn,
         options);
@@ -522,8 +521,11 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>();
 
     @Override
-    public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp,
-                                    Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+    public void outputWindowedValue(
+        KV<String, OutputT> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
       // Copy the output value (using coders) before capturing it.
       KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder(
           KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow");
@@ -538,25 +540,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         Instant timestamp,
         Collection<? extends BoundedWindow> windows,
         PaneInfo pane) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private class TestSideInputAccess implements SideInputAccess {
-    private SideInputReader sideInputReader;
-
-    private TestSideInputAccess(SideInputReader sideInputReader) {
-      this.sideInputReader = sideInputReader;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-      if (!sideInputReader.contains(view)) {
-        throw new IllegalArgumentException("calling sideInput() with unknown view");
-      }
-      BoundedWindow sideInputWindow =
-          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
-      return sideInputReader.get(view, sideInputWindow);
+      throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 0e8adba..a5bb214 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -22,7 +22,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
@@ -31,7 +30,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindo
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
-import org.apache.beam.runners.core.SideInputAccess;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -47,14 +45,13 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -176,12 +173,22 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
               stateInternals,
               timerInternals,
               new OutputWindowedValueToBundle<>(bundle),
-              new SideInputAccess() {
+              new SideInputReader() {
                 @Override
-                public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+                public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) {
                   throw new UnsupportedOperationException(
                       "GroupAlsoByWindow must not have side inputs");
                 }
+
+                @Override
+                public <T> boolean contains(PCollectionView<T> view) {
+                  throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public boolean isEmpty() {
+                  throw new UnsupportedOperationException();
+                }
               },
               droppedDueToClosedWindow,
               reduceFn,
@@ -276,7 +283,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
         Instant timestamp,
         Collection<? extends BoundedWindow> windows,
         PaneInfo pane) {
-      throw new UnsupportedOperationException("Can't output to side outputs from a ReduceFn");
+      throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index dc0ef0f..db045f5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -64,13 +64,14 @@ public class FlinkDoFnFunction<InputT, OutputT>
       Iterable<WindowedValue<InputT>> values,
       Collector<WindowedValue<OutputT>> out) throws Exception {
 
-    FlinkSingleOutputProcessContext<InputT, OutputT> context = new FlinkSingleOutputProcessContext<>(
-        serializedOptions.getPipelineOptions(),
-        getRuntimeContext(),
-        doFn,
-        windowingStrategy,
-        sideInputs, out
-    );
+    FlinkSingleOutputProcessContext<InputT, OutputT> context =
+        new FlinkSingleOutputProcessContext<>(
+            serializedOptions.getPipelineOptions(),
+            getRuntimeContext(),
+            doFn,
+            windowingStrategy,
+            sideInputs,
+            out);
 
     this.doFn.startBundle(context);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index b814015..2169785 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -43,7 +43,6 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 
 /**
@@ -162,19 +161,13 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
       @Override
       public <ViewT> ViewT sideInput(
           PCollectionView<ViewT> view,
-          BoundedWindow mainInputWindow) {
+          BoundedWindow sideInputWindow) {
 
         checkNotNull(view, "View passed to sideInput cannot be null");
         checkNotNull(
             sideInputs.get(view),
             "Side input for " + view + " not available.");
 
-        // get the side input strategy for mapping the window
-        WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
-
-        BoundedWindow sideInputWindow =
-            windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow);
-
         Map<BoundedWindow, ViewT> sideInputs =
             runtimeContext.getBroadcastVariableWithInitializer(
                 view.getTagInternal().getId(), new SideInputInitializer<>(view));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
index d67f6fd..529b1cc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index fa08c5b..f4be121 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
-import org.joda.time.Instant;
 
 
 /**
@@ -88,12 +87,12 @@ public class DoFnFunction<InputT, OutputT>
     }
 
     @Override
-    public synchronized void output(WindowedValue<OutputT> o) {
+    protected synchronized void outputWindowedValue(WindowedValue<OutputT> o) {
       outputs.add(o);
     }
 
     @Override
-    public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output) {
       throw new UnsupportedOperationException(
           "sideOutput is an unsupported operation for doFunctions, use a "
               + "MultiDoFunction instead.");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index d015b08..8175beb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -34,8 +34,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.joda.time.Instant;
-
 import scala.Tuple2;
 
 /**
@@ -98,12 +96,12 @@ public class MultiDoFnFunction<InputT, OutputT>
     }
 
     @Override
-    public synchronized void output(WindowedValue<OutputT> o) {
+    protected synchronized void outputWindowedValue(WindowedValue<OutputT> o) {
       outputs.put(mMainOutputTag, o);
     }
 
     @Override
-    public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output) {
       outputs.put(tag, output);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index afbc824..6a6cbd4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -150,9 +150,9 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
   public void outputWithTimestamp(OutputT output, Instant timestamp) {
     if (windowedValue == null) {
       // this is start/finishBundle.
-      output(noElementWindowedValue(output, timestamp, windowFn));
+      outputWindowedValue(noElementWindowedValue(output, timestamp, windowFn));
     } else {
-      output(WindowedValue.of(output, timestamp, windowedValue.getWindows(),
+      outputWindowedValue(WindowedValue.of(output, timestamp, windowedValue.getWindows(),
           windowedValue.getPane()));
     }
   }
@@ -167,15 +167,16 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
   public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
     if (windowedValue == null) {
       // this is start/finishBundle.
-      sideOutput(tag, noElementWindowedValue(output, timestamp, windowFn));
+      sideOutputWindowedValue(tag, noElementWindowedValue(output, timestamp, windowFn));
     } else {
-      sideOutput(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(),
+      sideOutputWindowedValue(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(),
           windowedValue.getPane()));
     }
   }
 
-  public abstract void output(WindowedValue<OutputT> output);
-  public abstract <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output);
+  protected abstract void outputWindowedValue(WindowedValue<OutputT> output);
+
+  protected abstract <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output);
 
   static <T, W extends BoundedWindow> WindowedValue<T> noElementWindowedValue(
       final T output, final Instant timestamp, WindowFn<Object, W> windowFn) {
@@ -241,16 +242,24 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
       }
 
       @Override
-      public void outputWindowedValue(OutputT output, Instant timestamp, Collection<?
-          extends BoundedWindow> windows, PaneInfo paneInfo) {
-        output(WindowedValue.of(output, timestamp, windows, paneInfo));
+      public void outputWindowedValue(
+          OutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo paneInfo) {
+        SparkProcessContext.this.outputWindowedValue(
+            WindowedValue.of(output, timestamp, windows, paneInfo));
       }
 
       @Override
       public <SideOutputT> void sideOutputWindowedValue(
-          TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp,
-          Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {
-        sideOutput(tag, WindowedValue.of(output, timestamp, windows, paneInfo));
+          TupleTag<SideOutputT> tag,
+          SideOutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo paneInfo) {
+        SparkProcessContext.this.sideOutputWindowedValue(
+            tag, WindowedValue.of(output, timestamp, windows, paneInfo));
       }
 
       @Override
@@ -273,14 +282,14 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
       }
 
       @Override
-      public <T> void writePCollectionViewData(TupleTag<?> tag,
-          Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+      public <T> void writePCollectionViewData(
+          TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
         throw new UnsupportedOperationException(
             "WindowingInternals#writePCollectionViewData() is not yet supported.");
       }
 
       @Override
-      public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+      public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
         throw new UnsupportedOperationException(
             "WindowingInternals#sideInput() is not yet supported.");
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index dd7d894..bbf0315 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -710,7 +710,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
         @Override
         public <T> T sideInput(
-            PCollectionView<T> view, BoundedWindow mainInputWindow) {
+            PCollectionView<T> view, BoundedWindow sideInputWindow) {
           throw new UnsupportedOperationException(
               "SideInput from WindowingInternals is not supported in in the context of DoFnTester");
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
index ab3c600..5e90864 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
@@ -86,7 +86,7 @@ public interface WindowingInternals<InputT, OutputT> {
       Coder<T> elemCoder) throws IOException;
 
   /**
-   * Return the value of the side input for the window of a main input element.
+   * Return the value of the side input for a particular side input window.
    */
-  <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow);
+  <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow);
 }