You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:23 UTC

[14/18] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
deleted file mode 100644
index 8f50105..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.util.UserCodeException;
-
-/**
- * Test Flink runner.
- */
-public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
-
-  private FlinkRunner delegate;
-
-  private TestFlinkRunner(FlinkPipelineOptions options) {
-    // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment
-    options.setFlinkMaster("[auto]");
-    this.delegate = FlinkRunner.fromOptions(options);
-  }
-
-  public static TestFlinkRunner fromOptions(PipelineOptions options) {
-    FlinkPipelineOptions flinkOptions =
-        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
-    return new TestFlinkRunner(flinkOptions);
-  }
-
-  public static TestFlinkRunner create(boolean streaming) {
-    FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    flinkOptions.setRunner(TestFlinkRunner.class);
-    flinkOptions.setStreaming(streaming);
-    return TestFlinkRunner.fromOptions(flinkOptions);
-  }
-
-  @Override
-  public PipelineResult run(Pipeline pipeline) {
-    try {
-      return delegate.run(pipeline);
-    } catch (Throwable t) {
-      // Special case hack to pull out assertion errors from PAssert; instead there should
-      // probably be a better story along the lines of UserCodeException.
-      UserCodeException innermostUserCodeException = null;
-      Throwable current = t;
-      for (; current.getCause() != null; current = current.getCause()) {
-        if (current instanceof UserCodeException) {
-          innermostUserCodeException = ((UserCodeException) current);
-        }
-      }
-      if (innermostUserCodeException != null) {
-        current = innermostUserCodeException.getCause();
-      }
-      if (current instanceof AssertionError) {
-        throw (AssertionError) current;
-      }
-      throw new PipelineExecutionException(current);
-    }
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return delegate.getPipelineOptions();
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
deleted file mode 100644
index ad54750..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-/**
- * The translation mode of the Beam Pipeline.
- */
-enum TranslationMode {
-
-  /** Uses the batch mode of Flink. */
-  BATCH,
-
-  /** Uses the streaming mode of Flink. */
-  STREAMING
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
deleted file mode 100644
index 57f1e59..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
deleted file mode 100644
index fb2493b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * A {@link AggregatorFactory} for the Flink Batch Runner.
- */
-public class FlinkAggregatorFactory implements AggregatorFactory{
-
-  private final RuntimeContext runtimeContext;
-
-  public FlinkAggregatorFactory(RuntimeContext runtimeContext) {
-    this.runtimeContext = runtimeContext;
-  }
-
-  @Override
-  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-      Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
-      Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-    @SuppressWarnings("unchecked")
-    SerializableFnAggregatorWrapper<InputT, OutputT> result =
-        (SerializableFnAggregatorWrapper<InputT, OutputT>)
-            runtimeContext.getAccumulator(aggregatorName);
-
-    if (result == null) {
-      result = new SerializableFnAggregatorWrapper<>(combine);
-      runtimeContext.addAccumulator(aggregatorName, result);
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
deleted file mode 100644
index 447b1e5..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for
- * Flink functions.
- */
-class FlinkAssignContext<InputT, W extends BoundedWindow>
-    extends WindowFn<InputT, W>.AssignContext {
-  private final WindowedValue<InputT> value;
-
-  FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
-    fn.super();
-    checkArgument(
-        Iterables.size(value.getWindows()) == 1,
-        String.format(
-            "%s passed to window assignment must be in a single window, but it was in %s: %s",
-            WindowedValue.class.getSimpleName(),
-            Iterables.size(value.getWindows()),
-            value.getWindows()));
-    this.value = value;
-  }
-
-  @Override
-  public InputT element() {
-    return value.getValue();
-  }
-
-  @Override
-  public Instant timestamp() {
-    return value.getTimestamp();
-  }
-
-  @Override
-  public BoundedWindow window() {
-    return Iterables.getOnlyElement(value.getWindows());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
deleted file mode 100644
index c3a5095..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Flink {@link FlatMapFunction} for implementing
- * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
- */
-public class FlinkAssignWindows<T, W extends BoundedWindow>
-    implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
-
-  private final WindowFn<T, W> windowFn;
-
-  public FlinkAssignWindows(WindowFn<T, W> windowFn) {
-    this.windowFn = windowFn;
-  }
-
-  @Override
-  public void flatMap(
-      WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception {
-    Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input));
-    for (W window: windows) {
-      collector.collect(
-          WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
deleted file mode 100644
index 51582af..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-
-/**
- * Encapsulates a {@link DoFn}
- * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
- *
- * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
- * and must tag all outputs with the output number. Afterwards a filter will filter out
- * those elements that are not to be in a specific output.
- */
-public class FlinkDoFnFunction<InputT, OutputT>
-    extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
-
-  private final SerializedPipelineOptions serializedOptions;
-
-  private final DoFn<InputT, OutputT> doFn;
-  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  private final WindowingStrategy<?, ?> windowingStrategy;
-
-  private final Map<TupleTag<?>, Integer> outputMap;
-  private final TupleTag<OutputT> mainOutputTag;
-
-  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
-
-  public FlinkDoFnFunction(
-      DoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions options,
-      Map<TupleTag<?>, Integer> outputMap,
-      TupleTag<OutputT> mainOutputTag) {
-
-    this.doFn = doFn;
-    this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(options);
-    this.windowingStrategy = windowingStrategy;
-    this.outputMap = outputMap;
-    this.mainOutputTag = mainOutputTag;
-
-  }
-
-  @Override
-  public void mapPartition(
-      Iterable<WindowedValue<InputT>> values,
-      Collector<WindowedValue<OutputT>> out) throws Exception {
-
-    RuntimeContext runtimeContext = getRuntimeContext();
-
-    DoFnRunners.OutputManager outputManager;
-    if (outputMap == null) {
-      outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
-    } else {
-      // it has some additional outputs
-      outputManager =
-          new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
-    }
-
-    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(), doFn,
-        new FlinkSideInputReader(sideInputs, runtimeContext),
-        outputManager,
-        mainOutputTag,
-        // see SimpleDoFnRunner, just use it to limit number of additional outputs
-        Collections.<TupleTag<?>>emptyList(),
-        new FlinkNoOpStepContext(),
-        new FlinkAggregatorFactory(runtimeContext),
-        windowingStrategy);
-
-    doFnRunner.startBundle();
-
-    for (WindowedValue<InputT> value : values) {
-      doFnRunner.processElement(value);
-    }
-
-    doFnRunner.finishBundle();
-  }
-
-  @Override
-  public void open(Configuration parameters) throws Exception {
-    doFnInvoker = DoFnInvokers.invokerFor(doFn);
-    doFnInvoker.invokeSetup();
-  }
-
-  @Override
-  public void close() throws Exception {
-    doFnInvoker.invokeTeardown();
-  }
-
-  static class DoFnOutputManager
-      implements DoFnRunners.OutputManager {
-
-    private Collector collector;
-
-    DoFnOutputManager(Collector collector) {
-      this.collector = collector;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      collector.collect(output);
-    }
-  }
-
-  static class MultiDoFnOutputManager
-      implements DoFnRunners.OutputManager {
-
-    private Collector<WindowedValue<RawUnionValue>> collector;
-    private Map<TupleTag<?>, Integer> outputMap;
-
-    MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector,
-                      Map<TupleTag<?>, Integer> outputMap) {
-      this.collector = collector;
-      this.outputMap = outputMap;
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), output.getValue()),
-          output.getTimestamp(), output.getWindows(), output.getPane()));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
deleted file mode 100644
index 26fd0b4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkReduceFunction} that supports merging windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- *
- * <p>This is different from the pair of function for the non-merging windows case
- * in that we cannot do combining before the shuffle because elements would not
- * yet be in their correct windows for side-input access.
- */
-public class FlinkMergingNonShuffleReduceFunction<
-    K, InputT, AccumT, OutputT, W extends IntervalWindow>
-    extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
-
-  private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
-
-  private final WindowingStrategy<?, W> windowingStrategy;
-
-  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  private final SerializedPipelineOptions serializedOptions;
-
-  public FlinkMergingNonShuffleReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
-      WindowingStrategy<?, W> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions) {
-
-    this.combineFn = keyedCombineFn;
-
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
-  }
-
-  @Override
-  public void reduce(
-      Iterable<WindowedValue<KV<K, InputT>>> elements,
-      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
-
-    PipelineOptions options = serializedOptions.getPipelineOptions();
-
-    FlinkSideInputReader sideInputReader =
-        new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
-    PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
-      for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, InputT>> o1,
-          WindowedValue<KV<K, InputT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // merge windows, we have to do it in an extra pre-processing step and
-    // can't do it as we go since the window of early elements would not
-    // be correct when calling the CombineFn
-    mergeWindow(sortedInput);
-
-    // iterate over the elements that are sorted by window timestamp
-    final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
-
-    // create accumulator using the first elements key
-    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
-    InputT firstValue = currentValue.getValue().getValue();
-    AccumT accumulator =
-        combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows());
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
-        options, sideInputReader, currentValue.getWindows());
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn
-    Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (currentWindow.equals(nextWindow)) {
-        // continue accumulating and merge windows
-
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamp = outputTimeFn.combine(
-            windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
-
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                    options, sideInputReader, currentValue.getWindows())),
-                windowTimestamp,
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.createAccumulator(key,
-            options, sideInputReader, currentValue.getWindows());
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
-      }
-
-    }
-
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                options, sideInputReader, currentValue.getWindows())),
-            windowTimestamp,
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
-
-  /**
-   * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
-   * This replaces windows in the input list.
-   */
-  private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
-    int currentStart = 0;
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
-
-    for (int i = 1; i < elements.size(); i++) {
-      WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-      if (currentWindow.intersects(nextWindow)) {
-        // we continue
-        currentWindow = currentWindow.span(nextWindow);
-      } else {
-        // retrofit the merged window to all windows up to "currentStart"
-        for (int j = i - 1; j >= currentStart; j--) {
-          WindowedValue<KV<K, InputT>> value = elements.get(j);
-          elements.set(
-              j,
-              WindowedValue.of(
-                  value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
-        }
-        currentStart = i;
-        currentWindow = nextWindow;
-      }
-    }
-    if (currentStart < elements.size() - 1) {
-      // we have to retrofit the last batch
-      for (int j = elements.size() - 1; j >= currentStart; j--) {
-        WindowedValue<KV<K, InputT>> value = elements.get(j);
-        elements.set(
-            j,
-            WindowedValue.of(
-                value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
deleted file mode 100644
index c68f155..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkPartialReduceFunction} that supports merging windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- */
-public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow>
-    extends FlinkPartialReduceFunction<K, InputT, AccumT, W> {
-
-  public FlinkMergingPartialReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
-      WindowingStrategy<?, W> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions) {
-    super(combineFn, windowingStrategy, sideInputs, pipelineOptions);
-  }
-
-  @Override
-  public void combine(
-      Iterable<WindowedValue<KV<K, InputT>>> elements,
-      Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
-
-    PipelineOptions options = serializedOptions.getPipelineOptions();
-
-    FlinkSideInputReader sideInputReader =
-        new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
-    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
-      for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, InputT>> o1,
-          WindowedValue<KV<K, InputT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // merge windows, we have to do it in an extra pre-processing step and
-    // can't do it as we go since the window of early elements would not
-    // be correct when calling the CombineFn
-    mergeWindow(sortedInput);
-
-    // iterate over the elements that are sorted by window timestamp
-    final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
-
-    // create accumulator using the first elements key
-    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
-    InputT firstValue = currentValue.getValue().getValue();
-    AccumT accumulator = combineFnRunner.createAccumulator(key,
-        options, sideInputReader, currentValue.getWindows());
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
-        options, sideInputReader, currentValue.getWindows());
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn
-    Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (currentWindow.equals(nextWindow)) {
-        // continue accumulating and merge windows
-
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamp = outputTimeFn.combine(
-            windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
-
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, accumulator),
-                windowTimestamp,
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.createAccumulator(key,
-            options, sideInputReader, currentValue.getWindows());
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
-      }
-    }
-
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, accumulator),
-            windowTimestamp,
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
-
-  /**
-   * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
-   * This replaces windows in the input list.
-   */
-  private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
-    int currentStart = 0;
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
-
-    for (int i = 1; i < elements.size(); i++) {
-      WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-      if (currentWindow.intersects(nextWindow)) {
-        // we continue
-        currentWindow = currentWindow.span(nextWindow);
-      } else {
-        // retrofit the merged window to all windows up to "currentStart"
-        for (int j = i - 1; j >= currentStart; j--) {
-          WindowedValue<KV<K, InputT>> value = elements.get(j);
-          elements.set(
-              j,
-              WindowedValue.of(
-                  value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
-        }
-        currentStart = i;
-        currentWindow = nextWindow;
-      }
-    }
-    if (currentStart < elements.size() - 1) {
-      // we have to retrofit the last batch
-      for (int j = elements.size() - 1; j >= currentStart; j--) {
-        WindowedValue<KV<K, InputT>> value = elements.get(j);
-        elements.set(
-            j,
-            WindowedValue.of(
-                value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
deleted file mode 100644
index 84b3adc..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkReduceFunction} that supports merging windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- */
-public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow>
-    extends FlinkReduceFunction<K, AccumT, OutputT, W> {
-
-  public FlinkMergingReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
-      WindowingStrategy<?, W> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions) {
-    super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions);
-  }
-
-  @Override
-  public void reduce(
-      Iterable<WindowedValue<KV<K, AccumT>>> elements,
-      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
-
-    PipelineOptions options = serializedOptions.getPipelineOptions();
-
-    FlinkSideInputReader sideInputReader =
-        new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
-    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, AccumT>> inputValue : elements) {
-      for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, AccumT>> o1,
-          WindowedValue<KV<K, AccumT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // merge windows, we have to do it in an extra pre-processing step and
-    // can't do it as we go since the window of early elements would not
-    // be correct when calling the CombineFn
-    mergeWindow(sortedInput);
-
-    // iterate over the elements that are sorted by window timestamp
-    final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator();
-
-    // get the first accumulator
-    WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
-    AccumT accumulator = currentValue.getValue().getValue();
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn,
-    // in FlinkPartialReduceFunction we already merge the timestamps assigned
-    // to individual elements, here we just merge them
-    List<Instant> windowTimestamps = new ArrayList<>();
-    windowTimestamps.add(currentValue.getTimestamp());
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (nextWindow.equals(currentWindow)) {
-        // continue accumulating and merge windows
-
-        accumulator = combineFnRunner.mergeAccumulators(
-            key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamps.add(nextValue.getTimestamp());
-      } else {
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                    options, sideInputReader, currentValue.getWindows())),
-                outputTimeFn.merge(currentWindow, windowTimestamps),
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        windowTimestamps.clear();
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        accumulator = nextValue.getValue().getValue();
-        windowTimestamps.add(nextValue.getTimestamp());
-      }
-    }
-
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                options, sideInputReader, currentValue.getWindows())),
-            outputTimeFn.merge(currentWindow, windowTimestamps),
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
-
-  /**
-   * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
-   * This replaces windows in the input list.
-   */
-  private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) {
-    int currentStart = 0;
-    IntervalWindow currentWindow =
-        (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
-
-    for (int i = 1; i < elements.size(); i++) {
-      WindowedValue<KV<K, AccumT>> nextValue = elements.get(i);
-      IntervalWindow nextWindow =
-          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-      if (currentWindow.intersects(nextWindow)) {
-        // we continue
-        currentWindow = currentWindow.span(nextWindow);
-      } else {
-        // retrofit the merged window to all windows up to "currentStart"
-        for (int j = i - 1; j >= currentStart; j--) {
-          WindowedValue<KV<K, AccumT>> value = elements.get(j);
-          elements.set(
-              j,
-              WindowedValue.of(
-                  value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
-        }
-        currentStart = i;
-        currentWindow = nextWindow;
-      }
-    }
-    if (currentStart < elements.size() - 1) {
-      // we have to retrofit the last batch
-      for (int j = elements.size() - 1; j >= currentStart; j--) {
-        WindowedValue<KV<K, AccumT>> value = elements.get(j);
-        elements.set(
-            j,
-            WindowedValue.of(
-                value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
deleted file mode 100644
index 9071cc5..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * A {@link FlatMapFunction} function that filters out those elements that don't belong in this
- * output. We need this to implement MultiOutput ParDo functions in combination with
- * {@link FlinkDoFnFunction}.
- */
-public class FlinkMultiOutputPruningFunction<T>
-    implements FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<T>> {
-
-  private final int ourOutputTag;
-
-  public FlinkMultiOutputPruningFunction(int ourOutputTag) {
-    this.ourOutputTag = ourOutputTag;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void flatMap(
-      WindowedValue<RawUnionValue> windowedValue,
-      Collector<WindowedValue<T>> collector) throws Exception {
-    int unionTag = windowedValue.getValue().getUnionTag();
-    if (unionTag == ourOutputTag) {
-      collector.collect(
-          (WindowedValue<T>) windowedValue.withValue(windowedValue.getValue().getValue()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
deleted file mode 100644
index 847a00a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * A {@link StepContext} for Flink Batch Runner execution.
- */
-public class FlinkNoOpStepContext implements StepContext {
-
-  @Override
-  public String getStepName() {
-    return null;
-  }
-
-  @Override
-  public String getTransformName() {
-    return null;
-  }
-
-  @Override
-  public void noteOutput(WindowedValue<?> output) {
-
-  }
-
-  @Override
-  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-
-  }
-
-  @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(
-      TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder,
-      W window,
-      Coder<W> windowCoder) throws IOException {
-  }
-
-  @Override
-  public StateInternals<?> stateInternals() {
-    return null;
-  }
-
-  @Override
-  public TimerInternals timerInternals() {
-    return null;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
deleted file mode 100644
index 1d1ff9f..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.RichGroupCombineFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * This is is the first step for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey}
- * on Flink. The second part is {@link FlinkReduceFunction}. This function performs a local
- * combine step before shuffling while the latter does the final combination after a shuffle.
- *
- * <p>The input to {@link #combine(Iterable, Collector)} are elements of the same key but
- * for different windows. We have to ensure that we only combine elements of matching
- * windows.
- */
-public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow>
-    extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> {
-
-  protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
-
-  protected final WindowingStrategy<?, W> windowingStrategy;
-
-  protected final SerializedPipelineOptions serializedOptions;
-
-  protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  public FlinkPartialReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
-      WindowingStrategy<?, W> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions) {
-
-    this.combineFn = combineFn;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
-  }
-
-  @Override
-  public void combine(
-      Iterable<WindowedValue<KV<K, InputT>>> elements,
-      Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
-
-    PipelineOptions options = serializedOptions.getPipelineOptions();
-
-    FlinkSideInputReader sideInputReader =
-        new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
-    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
-      for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, InputT>> o1,
-          WindowedValue<KV<K, InputT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // iterate over the elements that are sorted by window timestamp
-    //
-    final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
-
-    // create accumulator using the first elements key
-    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
-    InputT firstValue = currentValue.getValue().getValue();
-    AccumT accumulator = combineFnRunner.createAccumulator(key,
-        options, sideInputReader, currentValue.getWindows());
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
-        options, sideInputReader, currentValue.getWindows());
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn
-    Instant windowTimestamp =
-        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (nextWindow.equals(currentWindow)) {
-        // continue accumulating
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamp = outputTimeFn.combine(
-            windowTimestamp,
-            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
-
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, accumulator),
-                windowTimestamp,
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        InputT value = nextValue.getValue().getValue();
-        accumulator = combineFnRunner.createAccumulator(key,
-            options, sideInputReader, currentValue.getWindows());
-        accumulator = combineFnRunner.addInput(key, accumulator, value,
-            options, sideInputReader, currentValue.getWindows());
-        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
-      }
-    }
-
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, accumulator),
-            windowTimestamp,
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
deleted file mode 100644
index 3e4f742..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey}
- * on Flink, the second part is {@link FlinkReduceFunction}. This function performs the final
- * combination of the pre-combined values after a shuffle.
- *
- * <p>The input to {@link #reduce(Iterable, Collector)} are elements of the same key but
- * for different windows. We have to ensure that we only combine elements of matching
- * windows.
- */
-public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
-    extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> {
-
-  protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
-
-  protected final WindowingStrategy<?, W> windowingStrategy;
-
-  protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  protected final SerializedPipelineOptions serializedOptions;
-
-  public FlinkReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
-      WindowingStrategy<?, W> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions) {
-
-    this.combineFn = keyedCombineFn;
-
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
-  }
-
-  @Override
-  public void reduce(
-      Iterable<WindowedValue<KV<K, AccumT>>> elements,
-      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
-
-    PipelineOptions options = serializedOptions.getPipelineOptions();
-
-    FlinkSideInputReader sideInputReader =
-        new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
-    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
-
-    @SuppressWarnings("unchecked")
-    OutputTimeFn<? super BoundedWindow> outputTimeFn =
-        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
-
-    // get all elements so that we can sort them, has to fit into
-    // memory
-    // this seems very unprudent, but correct, for now
-    ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, AccumT>> inputValue: elements) {
-      for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) {
-        sortedInput.add(exploded);
-      }
-    }
-    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() {
-      @Override
-      public int compare(
-          WindowedValue<KV<K, AccumT>> o1,
-          WindowedValue<KV<K, AccumT>> o2) {
-        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
-            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
-      }
-    });
-
-    // iterate over the elements that are sorted by window timestamp
-    //
-    final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator();
-
-    // get the first accumulator
-    WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
-    K key = currentValue.getValue().getKey();
-    BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
-    AccumT accumulator = currentValue.getValue().getValue();
-
-    // we use this to keep track of the timestamps assigned by the OutputTimeFn,
-    // in FlinkPartialReduceFunction we already merge the timestamps assigned
-    // to individual elements, here we just merge them
-    List<Instant> windowTimestamps = new ArrayList<>();
-    windowTimestamps.add(currentValue.getTimestamp());
-
-    while (iterator.hasNext()) {
-      WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
-      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
-
-      if (nextWindow.equals(currentWindow)) {
-        // continue accumulating
-        accumulator = combineFnRunner.mergeAccumulators(
-            key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
-            options, sideInputReader, currentValue.getWindows());
-
-        windowTimestamps.add(nextValue.getTimestamp());
-      } else {
-        // emit the value that we currently have
-        out.collect(
-            WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                    options, sideInputReader, currentValue.getWindows())),
-                outputTimeFn.merge(currentWindow, windowTimestamps),
-                currentWindow,
-                PaneInfo.NO_FIRING));
-
-        windowTimestamps.clear();
-
-        currentWindow = nextWindow;
-        currentValue = nextValue;
-        accumulator = nextValue.getValue().getValue();
-        windowTimestamps.add(nextValue.getTimestamp());
-      }
-
-    }
-
-    // emit the final accumulator
-    out.collect(
-        WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
-                options, sideInputReader, currentValue.getWindows())),
-            outputTimeFn.merge(currentWindow, windowTimestamps),
-            currentWindow,
-            PaneInfo.NO_FIRING));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
deleted file mode 100644
index c317182..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * A {@link SideInputReader} for the Flink Batch Runner.
- */
-public class FlinkSideInputReader implements SideInputReader {
-
-  private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  private RuntimeContext runtimeContext;
-
-  public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
-                              RuntimeContext runtimeContext) {
-    sideInputs = new HashMap<>();
-    for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
-      sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
-    }
-    this.runtimeContext = runtimeContext;
-  }
-
-  @Nullable
-  @Override
-  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    checkNotNull(view, "View passed to sideInput cannot be null");
-    TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
-    checkNotNull(
-        sideInputs.get(tag),
-        "Side input for " + view + " not available.");
-
-    Map<BoundedWindow, T> sideInputs =
-        runtimeContext.getBroadcastVariableWithInitializer(
-            tag.getId(), new SideInputInitializer<>(view));
-    T result = sideInputs.get(window);
-    if (result == null) {
-      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
-    }
-    return result;
-  }
-
-  @Override
-  public <T> boolean contains(PCollectionView<T> view) {
-    return sideInputs.containsKey(view.getTagInternal());
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return sideInputs.isEmpty();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
deleted file mode 100644
index c8193d2..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.InMemoryTimerInternals;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink Batch Runner.
- */
-public class FlinkStatefulDoFnFunction<K, V, OutputT>
-    extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> {
-
-  private final DoFn<KV<K, V>, OutputT> dofn;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-  private final SerializedPipelineOptions serializedOptions;
-  private final Map<TupleTag<?>, Integer> outputMap;
-  private final TupleTag<OutputT> mainOutputTag;
-  private transient DoFnInvoker doFnInvoker;
-
-  public FlinkStatefulDoFnFunction(
-      DoFn<KV<K, V>, OutputT> dofn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      PipelineOptions pipelineOptions,
-      Map<TupleTag<?>, Integer> outputMap,
-      TupleTag<OutputT> mainOutputTag) {
-
-    this.dofn = dofn;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-    this.outputMap = outputMap;
-    this.mainOutputTag = mainOutputTag;
-  }
-
-  @Override
-  public void reduce(
-      Iterable<WindowedValue<KV<K, V>>> values,
-      Collector<WindowedValue<OutputT>> out) throws Exception {
-    RuntimeContext runtimeContext = getRuntimeContext();
-
-    DoFnRunners.OutputManager outputManager;
-    if (outputMap == null) {
-      outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
-    } else {
-      // it has some additional Outputs
-      outputManager =
-          new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
-    }
-
-    final Iterator<WindowedValue<KV<K, V>>> iterator = values.iterator();
-
-    // get the first value, we need this for initializing the state internals with the key.
-    // we are guaranteed to have a first value, otherwise reduce() would not have been called.
-    WindowedValue<KV<K, V>> currentValue = iterator.next();
-    final K key = currentValue.getValue().getKey();
-
-    final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key);
-
-    // Used with Batch, we know that all the data is available for this key. We can't use the
-    // timer manager from the context because it doesn't exist. So we create one and advance
-    // time to the end after processing all elements.
-    final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-    timerInternals.advanceProcessingTime(Instant.now());
-    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
-    DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(), dofn,
-        new FlinkSideInputReader(sideInputs, runtimeContext),
-        outputManager,
-        mainOutputTag,
-        // see SimpleDoFnRunner, just use it to limit number of additional outputs
-        Collections.<TupleTag<?>>emptyList(),
-        new FlinkNoOpStepContext() {
-          @Override
-          public StateInternals<?> stateInternals() {
-            return stateInternals;
-          }
-          @Override
-          public TimerInternals timerInternals() {
-            return timerInternals;
-          }
-        },
-        new FlinkAggregatorFactory(runtimeContext),
-        windowingStrategy);
-
-    doFnRunner.startBundle();
-
-    doFnRunner.processElement(currentValue);
-    while (iterator.hasNext()) {
-      currentValue = iterator.next();
-      doFnRunner.processElement(currentValue);
-    }
-
-    // Finish any pending windows by advancing the input watermark to infinity.
-    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    // Finally, advance the processing time to infinity to fire any timers.
-    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    fireEligibleTimers(timerInternals, doFnRunner);
-
-    doFnRunner.finishBundle();
-  }
-
-  private void fireEligibleTimers(
-      InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner)
-      throws Exception {
-
-    while (true) {
-
-      TimerInternals.TimerData timer;
-      boolean hasFired = false;
-
-      while ((timer = timerInternals.removeNextEventTimer()) != null) {
-        hasFired = true;
-        fireTimer(timer, runner);
-      }
-      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
-        hasFired = true;
-        fireTimer(timer, runner);
-      }
-      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
-        hasFired = true;
-        fireTimer(timer, runner);
-      }
-      if (!hasFired) {
-        break;
-      }
-    }
-  }
-
-  private void fireTimer(
-      TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner) {
-    StateNamespace namespace = timer.getNamespace();
-    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-    BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-    doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
-  }
-
-  @Override
-  public void open(Configuration parameters) throws Exception {
-    doFnInvoker = DoFnInvokers.invokerFor(dofn);
-    doFnInvoker.invokeSetup();
-  }
-
-  @Override
-  public void close() throws Exception {
-    doFnInvoker.invokeTeardown();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
deleted file mode 100644
index 12222b4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-
-/**
- * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
- * from window to side input.
- */
-public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
-    implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
-
-  PCollectionView<ViewT> view;
-
-  public SideInputInitializer(PCollectionView<ViewT> view) {
-    this.view = view;
-  }
-
-  @Override
-  public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
-      Iterable<WindowedValue<ElemT>> inputValues) {
-
-    // first partition into windows
-    Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
-    for (WindowedValue<ElemT> value: inputValues) {
-      for (BoundedWindow window: value.getWindows()) {
-        List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
-        if (windowedValues == null) {
-          windowedValues = new ArrayList<>();
-          partitionedElements.put(window, windowedValues);
-        }
-        windowedValues.add(value);
-      }
-    }
-
-    Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
-
-    for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
-        partitionedElements.entrySet()) {
-
-      @SuppressWarnings("unchecked")
-      Iterable<WindowedValue<?>> elementsIterable =
-          (List<WindowedValue<?>>) (List<?>) elements.getValue();
-
-      resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
-    }
-
-    return resultMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
deleted file mode 100644
index 9f11212..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.functions;