You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:13 UTC

[07/19] beam git commit: Move SideInputReader to runners/core-java

Move SideInputReader to runners/core-java


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6542eafc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6542eafc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6542eafc

Branch: refs/heads/master
Commit: 6542eafcf3a84105d2716cd0200d40ccdf764472
Parents: 1d4b1ed
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 19:46:59 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 16:06:55 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/apex/ApexRunnerRegistrar.java  |  1 -
 .../beam/runners/apex/ApexRunnerResult.java     |  2 -
 .../beam/runners/apex/ApexYarnLauncher.java     |  2 -
 .../translation/ReadUnboundedTranslator.java    |  1 -
 .../apex/translation/TransformTranslator.java   |  1 -
 .../operators/ApexGroupByKeyOperator.java       |  2 +-
 .../operators/ApexParDoOperator.java            |  4 +-
 .../ApexReadUnboundedInputOperator.java         |  2 -
 .../utils/CoderAdapterStreamCodec.java          |  2 -
 .../utils/ValueAndCoderKryoSerializable.java    |  2 -
 .../apex/translation/utils/ValuesSource.java    |  2 -
 .../beam/runners/apex/ApexYarnLauncherTest.java |  2 -
 .../apex/examples/UnboundedTextSource.java      |  2 -
 .../runners/apex/examples/WordCountTest.java    |  2 -
 .../translation/ApexGroupByKeyOperatorTest.java |  2 -
 .../translation/utils/CollectionSource.java     |  2 -
 .../apache/beam/runners/core/DoFnRunners.java   |  2 -
 .../runners/core/GlobalCombineFnRunner.java     |  1 -
 .../runners/core/GlobalCombineFnRunners.java    | 41 ++++++++++---
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |  1 -
 .../beam/runners/core/NullSideInputReader.java  | 61 ++++++++++++++++++++
 ...eBoundedSplittableProcessElementInvoker.java |  1 -
 .../beam/runners/core/ProcessFnRunner.java      |  1 -
 .../core/ReadyCheckingSideInputReader.java      | 34 +++++++++++
 .../runners/core/ReduceFnContextFactory.java    |  1 -
 .../beam/runners/core/ReduceFnRunner.java       |  1 -
 .../beam/runners/core/SideInputHandler.java     |  1 -
 .../beam/runners/core/SideInputReader.java      | 47 +++++++++++++++
 .../beam/runners/core/SimpleDoFnRunner.java     |  1 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |  1 -
 .../core/SimplePushbackSideInputDoFnRunner.java |  1 -
 .../core/UnsupportedSideInputReader.java        |  1 -
 .../core/WindowingInternalsAdapters.java        |  1 -
 ...ndedSplittableProcessElementInvokerTest.java |  1 -
 .../beam/runners/core/ReduceFnRunnerTest.java   |  1 -
 .../beam/runners/core/ReduceFnTester.java       |  2 -
 .../beam/runners/core/SimpleDoFnRunnerTest.java |  1 -
 .../SimplePushbackSideInputDoFnRunnerTest.java  |  1 -
 .../beam/runners/core/SplittableParDoTest.java  |  1 -
 .../runners/core/StatefulDoFnRunnerTest.java    |  1 -
 .../beam/runners/direct/EvaluationContext.java  |  4 +-
 .../beam/runners/direct/ParDoEvaluator.java     |  2 +-
 .../beam/runners/direct/SideInputContainer.java |  4 +-
 ...littableProcessElementsEvaluatorFactory.java |  2 +-
 .../runners/direct/EvaluationContextTest.java   |  2 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |  2 +-
 .../runners/direct/SideInputContainerTest.java  |  4 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |  2 +-
 .../functions/AbstractFlinkCombineRunner.java   |  2 +-
 .../functions/FlinkSideInputReader.java         |  2 +-
 .../functions/HashingFlinkCombineRunner.java    |  2 +-
 .../functions/SortingFlinkCombineRunner.java    |  2 +-
 .../wrappers/streaming/DoFnOperator.java        |  4 +-
 .../beam/runners/spark/SparkRunnerDebugger.java |  1 -
 .../spark/aggregators/NamedAggregators.java     |  2 -
 .../beam/runners/spark/coders/CoderHelpers.java |  1 -
 .../spark/coders/StatelessJavaSerializer.java   |  1 -
 .../runners/spark/io/SparkUnboundedSource.java  |  1 -
 .../runners/spark/metrics/AggregatorMetric.java |  1 -
 .../spark/metrics/SparkBeamMetricSource.java    |  1 -
 .../spark/stateful/StateSpecFunctions.java      |  1 -
 .../translation/SparkAbstractCombineFn.java     |  2 +-
 .../spark/translation/TranslationUtils.java     |  1 -
 .../spark/util/GlobalWatermarkHolder.java       |  1 -
 .../spark/util/SparkSideInputReader.java        |  2 +-
 .../beam/sdk/util/CombineContextFactory.java    | 25 --------
 .../beam/sdk/util/NullSideInputReader.java      | 61 --------------------
 .../sdk/util/ReadyCheckingSideInputReader.java  | 34 -----------
 .../apache/beam/sdk/util/SideInputReader.java   | 47 ---------------
 .../harness/control/ProcessBundleHandler.java   |  2 +-
 70 files changed, 199 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
index aa6ef45..8cde692 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.apex;
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
-
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunner;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 41fdb75..cc24ddd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -18,9 +18,7 @@
 package org.apache.beam.runners.apex;
 
 import com.datatorrent.api.DAG;
-
 import java.io.IOException;
-
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.ShutdownMode;
 import org.apache.beam.sdk.Pipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
index b84144c..18d8e94 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
@@ -25,7 +25,6 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -56,7 +55,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
-
 import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
index b3034ac..168cbf5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.apex.translation;
 
 import com.datatorrent.api.InputOperator;
-
 import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
index eb81052..49ff49b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.apex.translation;
 
 
 import java.io.Serializable;
-
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 9d4e9a2..85836ad 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternalsFactory;
@@ -49,7 +50,6 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 99ad964..8c516b1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -43,8 +43,10 @@ import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
@@ -56,8 +58,6 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
index ac28c2a..1549560 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -26,9 +26,7 @@ import com.datatorrent.common.util.BaseOperator;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Throwables;
-
 import java.io.IOException;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
index d08e76f..f6ce1d0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
@@ -19,12 +19,10 @@ package org.apache.beam.runners.apex.translation.utils;
 
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.netlet.util.Slice;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
index 395ad1f..2a72f17 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
@@ -22,9 +22,7 @@ import com.esotericsoftware.kryo.KryoSerializable;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
 import java.io.IOException;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 62c92a0..41f027f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -24,9 +24,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.IterableCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
index 6ffb091..68ec2ea 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
@@ -26,7 +26,6 @@ import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
-
 import java.io.File;
 import java.net.URI;
 import java.nio.file.FileSystem;
@@ -37,7 +36,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.jar.JarFile;
-
 import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index abe97f6..c590a2e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -23,9 +23,7 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
-
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.UnboundedSource;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index 83af61b..e76096e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -18,11 +18,9 @@
 package org.apache.beam.runners.apex.examples;
 
 import com.google.common.collect.Sets;
-
 import java.io.File;
 import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index e31d830..206b430 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -20,9 +20,7 @@ package org.apache.beam.runners.apex.translation;
 import com.datatorrent.api.Sink;
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.collect.Lists;
-
 import java.util.List;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.TestApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
index 92812b4..288aade 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -25,9 +25,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index f3dd9a3..ee3aefa 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -26,8 +26,6 @@ import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
index 5325ba6..4c312b4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
 
 /**
  * An interface that runs a {@link GlobalCombineFn} with unified APIs.

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
index d45b503..d98bac8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
@@ -22,10 +22,10 @@ import java.util.Collection;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different
@@ -47,6 +47,33 @@ public class GlobalCombineFnRunners {
   }
 
   /**
+   * Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader}, and
+   * the main input window.
+   */
+  private static CombineWithContext.Context createFromComponents(
+      final PipelineOptions options,
+      final SideInputReader sideInputReader,
+      final BoundedWindow mainInputWindow) {
+    return new CombineWithContext.Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return options;
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        if (!sideInputReader.contains(view)) {
+          throw new IllegalArgumentException("calling sideInput() with unknown view");
+        }
+
+        BoundedWindow sideInputWindow =
+            view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+        return sideInputReader.get(view, sideInputWindow);
+      }
+    };
+  }
+
+  /**
    * An implementation of {@link GlobalCombineFnRunner} with {@link CombineFn}.
    *
    * <p>It forwards functions calls to the {@link CombineFn}.
@@ -136,7 +163,7 @@ public class GlobalCombineFnRunners {
         SideInputReader sideInputReader,
         Collection<? extends BoundedWindow> windows) {
       return combineFnWithContext.createAccumulator(
-          CombineContextFactory.createFromComponents(
+          createFromComponents(
               options, sideInputReader, Iterables.getOnlyElement(windows)));
     }
 
@@ -150,7 +177,7 @@ public class GlobalCombineFnRunners {
       return combineFnWithContext.addInput(
           accumulator,
           input,
-          CombineContextFactory.createFromComponents(
+          createFromComponents(
               options, sideInputReader, Iterables.getOnlyElement(windows)));
     }
 
@@ -162,7 +189,7 @@ public class GlobalCombineFnRunners {
         Collection<? extends BoundedWindow> windows) {
       return combineFnWithContext.mergeAccumulators(
           accumulators,
-          CombineContextFactory.createFromComponents(
+          createFromComponents(
               options, sideInputReader, Iterables.getOnlyElement(windows)));
     }
 
@@ -174,7 +201,7 @@ public class GlobalCombineFnRunners {
         Collection<? extends BoundedWindow> windows) {
       return combineFnWithContext.extractOutput(
           accumulator,
-          CombineContextFactory.createFromComponents(
+          createFromComponents(
               options, sideInputReader, Iterables.getOnlyElement(windows)));
     }
 
@@ -186,7 +213,7 @@ public class GlobalCombineFnRunners {
         Collection<? extends BoundedWindow> windows) {
       return combineFnWithContext.compact(
           accumulator,
-          CombineContextFactory.createFromComponents(
+          createFromComponents(
               options, sideInputReader, Iterables.getOnlyElement(windows)));
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 4c1fe95..5b82d1f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java
new file mode 100644
index 0000000..786ed41
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link SideInputReader} representing a well-defined set of views, but not storing
+ * any values for them. Used to check if a side input is present when the data itself
+ * comes from elsewhere.
+ */
+public class NullSideInputReader implements SideInputReader {
+
+  private Set<PCollectionView<?>> views;
+
+  public static NullSideInputReader empty() {
+    return new NullSideInputReader(Collections.<PCollectionView<?>>emptySet());
+  }
+
+  public static NullSideInputReader of(Iterable<? extends PCollectionView<?>> views) {
+    return new NullSideInputReader(views);
+  }
+
+  private NullSideInputReader(Iterable<? extends PCollectionView<?>> views) {
+    this.views = Sets.newHashSet(views);
+  }
+
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    throw new IllegalArgumentException("cannot call NullSideInputReader.get()");
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return views.isEmpty();
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return views.contains(view);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 16bdfa3..2db6531 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 7cbf0d2..61f413f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -28,7 +28,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java
new file mode 100644
index 0000000..8d1f0e2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
+ * had its contents set in a window.
+ */
+public interface ReadyCheckingSideInputReader extends SideInputReader {
+  /**
+   * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
+   */
+  boolean isReady(PCollectionView<?> view, BoundedWindow window);
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 63e8e6d..bd327f3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a04006b..d2ed835 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index af75010..539b9f0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java
new file mode 100644
index 0000000..7d1b829
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * The interface to objects that provide side inputs. Particular implementations
+ * may read a side input directly or use appropriate sorts of caching, etc.
+ */
+public interface SideInputReader {
+  /**
+   * Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}.
+   *
+   * <p>It is valid for a side input to be {@code null}. It is <i>not</i> valid for this to
+   * return {@code null} for any other reason.
+   */
+  @Nullable
+  <T> T get(PCollectionView<T> view, BoundedWindow window);
+
+  /**
+   * Returns true if the given {@link PCollectionView} is valid for this reader.
+   */
+  <T> boolean contains(PCollectionView<T> view);
+
+  /**
+   * Returns true if there are no side inputs in this reader.
+   */
+  boolean isEmpty();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 4057790..aab34a5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index cf9dff2..2a0b688 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 36a04d9..3f77f7d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
index 4230f8c..c1c7179 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.values.PCollectionView;
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
index 1b36bf9..4a58445 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
 import java.util.Collection;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index 541e238..a2f6acc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 381a493..3dd98e0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 6fb0dcb..7de8f3b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -62,9 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index fae9117..abefd1c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index dabc4f0..79bf0b2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index b93ff3a..be4cf08 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -56,7 +56,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 2ca8b05..76351e4 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 75ff1c7..7b64611 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
@@ -42,8 +44,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index e5a6680..4cfd16f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
@@ -33,7 +34,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 1e773c9..380dc65 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -35,11 +35,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PCollectionViewWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 2c22caf..44f2e85 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
 import org.apache.beam.runners.core.StateInternals;
@@ -39,7 +40,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 123e9f2..077bb68 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
@@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index ef8add9..88fd5d2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index d7c4440..d4ca9fd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -44,8 +46,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index f6b652d..f1ba57c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -58,7 +59,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
index 4872a06..2ae7833 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
@@ -21,10 +21,10 @@ import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import org.apache.beam.runners.core.GlobalCombineFnRunner;
 import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
index fa95477..f275290 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -23,8 +23,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
index 942bf42..feb8b39 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java
@@ -27,12 +27,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index fb4c678..026a35c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -23,13 +23,13 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1844d6d..f35ba7a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -34,8 +34,10 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
@@ -64,8 +66,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 8d47e1a..11c52c7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
-
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index cf6c9ad..c836ca5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -22,14 +22,12 @@ import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.TreeMap;
-
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
index 9c46ecf..85e3572 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
@@ -32,7 +32,6 @@ import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
-
 import scala.Tuple2;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
index 01b3b93..0cf4951 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
@@ -23,7 +23,6 @@ import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 6b34590..0388f6c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -58,7 +58,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.dstream.DStream;
 import org.apache.spark.streaming.scheduler.StreamInputInfo;
 import org.joda.time.Instant;
-
 import scala.Tuple2;
 import scala.runtime.BoxedUnit;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
index 271cc6b..450bc95 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.spark.metrics;
 
 import com.codahale.metrics.Metric;
-
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
index 9cab66d..5c6fc24 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.spark.metrics;
 
 import com.codahale.metrics.MetricRegistry;
-
 import org.apache.spark.metrics.source.Source;
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 30ee639..d8d52c4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -47,7 +47,6 @@ import org.apache.spark.streaming.StateSpec;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Option;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction3;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index df633b0..315f7fb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -29,13 +29,13 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index f462e60..993062c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -50,7 +50,6 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
-
 import scala.Tuple2;
 
 /** A set of utilities to help translating Beam transformations into Spark transformations. */

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 212f974..8b384d8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index e876e12..6c91088 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -24,8 +24,8 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index f93cb0b..b72fd82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.util;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.StateContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -64,28 +63,4 @@ public class CombineContextFactory {
     };
   }
 
-  /**
-   * Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader},
-   * and the main input window.
-   */
-  public static Context createFromComponents(final PipelineOptions options,
-      final SideInputReader sideInputReader, final BoundedWindow mainInputWindow) {
-    return new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return options;
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        if (!sideInputReader.contains(view)) {
-          throw new IllegalArgumentException("calling sideInput() with unknown view");
-        }
-
-        BoundedWindow sideInputWindow =
-            view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
-        return sideInputReader.get(view, sideInputWindow);
-      }
-    };
-  }
 }