You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:15:12 UTC

[38/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
deleted file mode 100644
index 681459a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-
-/**
- * AutoService registrar - will register FlinkRunner and FlinkOptions
- * as possible pipeline runner services.
- *
- * <p>It ends up in META-INF/services and gets picked up by Beam.
- *
- */
-public class FlinkRunnerRegistrar {
-  private FlinkRunnerRegistrar() { }
-
-  /**
-   * Pipeline runner registrar.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class Runner implements PipelineRunnerRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          FlinkRunner.class,
-          TestFlinkRunner.class);
-    }
-  }
-
-  /**
-   * Pipeline options registrar.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class Options implements PipelineOptionsRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(FlinkPipelineOptions.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
deleted file mode 100644
index 0682b56..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.joda.time.Duration;
-
-/**
- * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This
- * has methods to query to job runtime and the final values of
- * {@link org.apache.beam.sdk.transforms.Aggregator}s.
- */
-public class FlinkRunnerResult implements PipelineResult {
-
-  private final Map<String, Object> aggregators;
-
-  private final long runtime;
-
-  FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
-    this.aggregators = (aggregators == null || aggregators.isEmpty())
-        ? Collections.<String, Object>emptyMap()
-        : Collections.unmodifiableMap(aggregators);
-    this.runtime = runtime;
-  }
-
-  @Override
-  public State getState() {
-    return State.DONE;
-  }
-
-  @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    // TODO provide a list of all accumulator step values
-    Object value = aggregators.get(aggregator.getName());
-    if (value != null) {
-      return new AggregatorValues<T>() {
-        @Override
-        public Map<String, T> getValuesAtSteps() {
-          return (Map<String, T>) aggregators;
-        }
-      };
-    } else {
-      throw new AggregatorRetrievalException("Accumulator results not found.",
-          new RuntimeException("Accumulator does not exist."));
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "FlinkRunnerResult{"
-        + "aggregators=" + aggregators
-        + ", runtime=" + runtime
-        + '}';
-  }
-
-  @Override
-  public State cancel() throws IOException {
-    throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel.");
-  }
-
-  @Override
-  public State waitUntilFinish() {
-    return State.DONE;
-  }
-
-  @Override
-  public State waitUntilFinish(Duration duration) {
-    return State.DONE;
-  }
-
-  @Override
-  public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
deleted file mode 100644
index 0459ef7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate
- * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a
- * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- *
- */
-class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
-
-  /** The necessary context in the case of a straming job. */
-  private final FlinkStreamingTranslationContext streamingContext;
-
-  private int depth = 0;
-
-  private FlinkRunner flinkRunner;
-
-  public FlinkStreamingPipelineTranslator(
-      FlinkRunner flinkRunner,
-      StreamExecutionEnvironment env,
-      PipelineOptions options) {
-    this.streamingContext = new FlinkStreamingTranslationContext(env, options);
-    this.flinkRunner = flinkRunner;
-  }
-
-  @Override
-  public void translate(Pipeline pipeline) {
-    List<PTransformOverride> transformOverrides =
-        ImmutableList.<PTransformOverride>builder()
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.splittableParDoMulti(),
-                    new SplittableParDoOverrideFactory()))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsIterable.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsList.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsMap.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsMultimap.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
-            // this has to be last since the ViewAsSingleton override
-            // can expand to a Combine.GloballyAsSingletonView
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
-                        flinkRunner)))
-            .build();
-
-    pipeline.replaceAll(transformOverrides);
-    super.translate(pipeline);
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Pipeline Visitor Methods
-  // --------------------------------------------------------------------------------------------
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-    this.depth++;
-
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null) {
-      StreamTransformTranslator<?> translator =
-          FlinkStreamingTransformTranslators.getTranslator(transform);
-
-      if (translator != null && applyCanTranslate(transform, node, translator)) {
-        applyStreamingTransform(transform, node, translator);
-        LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
-        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-      }
-    }
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    this.depth--;
-    LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
-    // get the transformation corresponding to hte node we are
-    // currently visiting and translate it into its Flink alternative.
-
-    PTransform<?, ?> transform = node.getTransform();
-    StreamTransformTranslator<?> translator =
-        FlinkStreamingTransformTranslators.getTranslator(transform);
-
-    if (translator == null || !applyCanTranslate(transform, node, translator)) {
-      LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException(
-          "The transform " + transform + " is currently not supported.");
-    }
-    applyStreamingTransform(transform, node, translator);
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformHierarchy.Node producer) {
-    // do nothing here
-  }
-
-  private <T extends PTransform<?, ?>> void applyStreamingTransform(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      StreamTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
-
-    // create the applied PTransform on the streamingContext
-    streamingContext.setCurrentTransform(node.toAppliedPTransform());
-    typedTranslator.translateNode(typedTransform, streamingContext);
-  }
-
-  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      StreamTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
-
-    streamingContext.setCurrentTransform(node.toAppliedPTransform());
-
-    return typedTranslator.canTranslate(typedTransform, streamingContext);
-  }
-
-  /**
-   * The interface that every Flink translator of a Beam operator should implement.
-   * This interface is for <b>streaming</b> jobs. For examples of such translators see
-   * {@link FlinkStreamingTransformTranslators}.
-   */
-  abstract static class StreamTransformTranslator<T extends PTransform> {
-
-    /**
-     * Translate the given transform.
-     */
-    abstract void translateNode(T transform, FlinkStreamingTranslationContext context);
-
-    /**
-     * Returns true iff this translator can translate the given transform.
-     */
-    boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
-      return true;
-    }
-  }
-
-  private static class ReflectiveOneToOneOverrideFactory<
-          InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
-      extends SingleInputOutputOverrideFactory<
-          PCollection<InputT>, PCollection<OutputT>, TransformT> {
-    private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
-    private final FlinkRunner runner;
-
-    private ReflectiveOneToOneOverrideFactory(
-        Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
-        FlinkRunner runner) {
-      this.replacement = replacement;
-      this.runner = runner;
-    }
-
-    @Override
-    public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
-        AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          InstanceBuilder.ofType(replacement)
-              .withArg(FlinkRunner.class, runner)
-              .withArg(
-                  (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
-                      transform.getTransform().getClass(),
-                  transform.getTransform())
-              .build());
-    }
-  }
-
-  /**
-   * A {@link PTransformOverrideFactory} that overrides a <a
-   * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.
-   */
-  static class SplittableParDoOverrideFactory<InputT, OutputT>
-      implements PTransformOverrideFactory<
-          PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
-    @Override
-    public PTransformReplacement<PCollection<InputT>, PCollectionTuple>
-        getReplacementTransform(
-            AppliedPTransform<
-                    PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
-                transform) {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          new SplittableParDo<>(transform.getTransform()));
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
-      return ReplacementOutputs.tagged(outputs, newOutput);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
deleted file mode 100644
index 123d5e7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ /dev/null
@@ -1,1044 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.beam.runners.core.ElementAndRestriction;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.join.UnionCoder;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class contains all the mappings between Beam and Flink
- * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator}
- * traverses the Beam job and comes here to translate the encountered Beam transformations
- * into Flink one, based on the mapping available in this class.
- */
-class FlinkStreamingTransformTranslators {
-
-  // --------------------------------------------------------------------------------------------
-  //  Transform Translator Registry
-  // --------------------------------------------------------------------------------------------
-
-  @SuppressWarnings("rawtypes")
-  private static final Map<
-      Class<? extends PTransform>,
-      FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
-
-  // here you can find all the available translators.
-  static {
-    TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
-    TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
-
-    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
-    TRANSLATORS.put(
-        SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator());
-    TRANSLATORS.put(
-        SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator());
-
-
-    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
-    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
-    TRANSLATORS.put(
-        FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
-        new CreateViewStreamingTranslator());
-
-    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
-  }
-
-  public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
-      PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Transformation Implementations
-  // --------------------------------------------------------------------------------------------
-
-  private static class TextIOWriteBoundStreamingTranslator
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
-
-    private static final Logger LOG =
-        LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
-    @Override
-    public void translateNode(
-        TextIO.Write.Bound transform,
-        FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input);
-
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      boolean needsValidation = transform.needsValidation();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn(
-          "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
-          needsValidation);
-      LOG.warn(
-          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
-          filenameSuffix);
-      LOG.warn(
-          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
-          shardNameTemplate);
-
-      DataStream<String> dataSink = inputDataStream
-          .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
-            @Override
-            public void flatMap(
-                WindowedValue<String> value,
-                Collector<String> out)
-                throws Exception {
-              out.collect(value.getValue());
-            }
-          });
-      DataStreamSink<String> output =
-          dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
-
-      if (numShards > 0) {
-        output.setParallelism(numShards);
-      }
-    }
-  }
-
-  private static class UnboundedReadSourceTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
-
-    @Override
-    public void translateNode(
-        Read.Unbounded<T> transform,
-        FlinkStreamingTranslationContext context) {
-      PCollection<T> output = context.getOutput(transform);
-
-      TypeInformation<WindowedValue<T>> outputTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
-      DataStream<WindowedValue<T>> source;
-      try {
-        UnboundedSourceWrapper<T, ?> sourceWrapper =
-            new UnboundedSourceWrapper<>(
-                context.getPipelineOptions(),
-                transform.getSource(),
-                context.getExecutionEnvironment().getParallelism());
-        source = context
-            .getExecutionEnvironment()
-            .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "Error while translating UnboundedSource: " + transform.getSource(), e);
-      }
-
-      context.setOutputDataStream(output, source);
-    }
-  }
-
-  private static class BoundedReadSourceTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
-
-    @Override
-    public void translateNode(
-        Read.Bounded<T> transform,
-        FlinkStreamingTranslationContext context) {
-      PCollection<T> output = context.getOutput(transform);
-
-      TypeInformation<WindowedValue<T>> outputTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
-
-      DataStream<WindowedValue<T>> source;
-      try {
-        BoundedSourceWrapper<T> sourceWrapper =
-            new BoundedSourceWrapper<>(
-                context.getPipelineOptions(),
-                transform.getSource(),
-                context.getExecutionEnvironment().getParallelism());
-        source = context
-            .getExecutionEnvironment()
-            .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "Error while translating BoundedSource: " + transform.getSource(), e);
-      }
-
-      context.setOutputDataStream(output, source);
-    }
-  }
-
-  /**
-   * Wraps each element in a {@link RawUnionValue} with the given tag id.
-   */
-  private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
-    private final int intTag;
-
-    public ToRawUnion(int intTag) {
-      this.intTag = intTag;
-    }
-
-    @Override
-    public RawUnionValue map(T o) throws Exception {
-      return new RawUnionValue(intTag, o);
-    }
-  }
-
-  private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
-        transformSideInputs(
-          Collection<PCollectionView<?>> sideInputs,
-          FlinkStreamingTranslationContext context) {
-
-    // collect all side inputs
-    Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
-    Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
-    int count = 0;
-    for (PCollectionView<?> sideInput: sideInputs) {
-      TupleTag<?> tag = sideInput.getTagInternal();
-      intToViewMapping.put(count, sideInput);
-      tagToIntMapping.put(tag, count);
-      count++;
-      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
-    }
-
-
-    List<Coder<?>> inputCoders = new ArrayList<>();
-    for (PCollectionView<?> sideInput: sideInputs) {
-      DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
-      TypeInformation<Object> tpe = sideInputStream.getType();
-      if (!(tpe instanceof CoderTypeInformation)) {
-        throw new IllegalStateException(
-            "Input Stream TypeInformation is no CoderTypeInformation.");
-      }
-
-      Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
-      inputCoders.add(coder);
-    }
-
-    UnionCoder unionCoder = UnionCoder.of(inputCoders);
-
-    CoderTypeInformation<RawUnionValue> unionTypeInformation =
-        new CoderTypeInformation<>(unionCoder);
-
-    // transform each side input to RawUnionValue and union them
-    DataStream<RawUnionValue> sideInputUnion = null;
-
-    for (PCollectionView<?> sideInput: sideInputs) {
-      TupleTag<?> tag = sideInput.getTagInternal();
-      final int intTag = tagToIntMapping.get(tag);
-      DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
-      DataStream<RawUnionValue> unionValueStream =
-          sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation);
-
-      if (sideInputUnion == null) {
-        sideInputUnion = unionValueStream;
-      } else {
-        sideInputUnion = sideInputUnion.union(unionValueStream);
-      }
-    }
-
-    if (sideInputUnion == null) {
-      throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
-    }
-
-    return new Tuple2<>(intToViewMapping, sideInputUnion);
-  }
-
-  /**
-   * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}.
-   */
-  static class ParDoTranslationHelper {
-
-    interface DoFnOperatorFactory<InputT, OutputT> {
-      DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
-          DoFn<InputT, OutputT> doFn,
-          List<PCollectionView<?>> sideInputs,
-          TupleTag<OutputT> mainOutputTag,
-          List<TupleTag<?>> additionalOutputTags,
-          FlinkStreamingTranslationContext context,
-          WindowingStrategy<?, ?> windowingStrategy,
-          Map<TupleTag<?>, Integer> tagsToLabels,
-          Coder<WindowedValue<InputT>> inputCoder,
-          Coder keyCoder,
-          Map<Integer, PCollectionView<?>> transformedSideInputs);
-    }
-
-    static <InputT, OutputT> void translateParDo(
-        String transformName,
-        DoFn<InputT, OutputT> doFn,
-        PCollection<InputT> input,
-        List<PCollectionView<?>> sideInputs,
-        Map<TupleTag<?>, PValue> outputs,
-        TupleTag<OutputT> mainOutputTag,
-        List<TupleTag<?>> additionalOutputTags,
-        FlinkStreamingTranslationContext context,
-        DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
-
-      // we assume that the transformation does not change the windowing strategy.
-      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-      Map<TupleTag<?>, Integer> tagsToLabels =
-          transformTupleTagsToLabels(mainOutputTag, outputs);
-
-      SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
-
-      Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
-
-      DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input);
-
-      Coder keyCoder = null;
-      boolean stateful = false;
-      DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-      if (signature.stateDeclarations().size() > 0
-          || signature.timerDeclarations().size() > 0) {
-        // Based on the fact that the signature is stateful, DoFnSignatures ensures
-        // that it is also keyed
-        keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
-        inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder));
-        stateful = true;
-      } else if (doFn instanceof SplittableParDo.ProcessFn) {
-        // we know that it is keyed on String
-        keyCoder = StringUtf8Coder.of();
-        stateful = true;
-      }
-
-      if (sideInputs.isEmpty()) {
-        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
-            doFnOperatorFactory.createDoFnOperator(
-                doFn,
-                sideInputs,
-                mainOutputTag,
-                additionalOutputTags,
-                context,
-                windowingStrategy,
-                tagsToLabels,
-                inputCoder,
-                keyCoder,
-                new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
-
-        UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
-        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
-            new CoderTypeInformation<>(outputUnionCoder);
-
-        unionOutputStream = inputDataStream
-            .transform(transformName, outputUnionTypeInformation, doFnOperator);
-
-      } else {
-        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
-            transformSideInputs(sideInputs, context);
-
-        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
-            doFnOperatorFactory.createDoFnOperator(
-                doFn,
-                sideInputs,
-                mainOutputTag,
-                additionalOutputTags,
-                context,
-                windowingStrategy,
-                tagsToLabels,
-                inputCoder,
-                keyCoder,
-                transformedSideInputs.f0);
-
-        UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
-        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
-            new CoderTypeInformation<>(outputUnionCoder);
-
-        if (stateful) {
-          // we have to manually contruct the two-input transform because we're not
-          // allowed to have only one input keyed, normally.
-          KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
-          TwoInputTransformation<
-              WindowedValue<KV<?, InputT>>,
-              RawUnionValue,
-              WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation(
-              keyedStream.getTransformation(),
-              transformedSideInputs.f1.broadcast().getTransformation(),
-              transformName,
-              (TwoInputStreamOperator) doFnOperator,
-              outputUnionTypeInformation,
-              keyedStream.getParallelism());
-
-          rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
-          rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
-
-          unionOutputStream = new SingleOutputStreamOperator(
-                  keyedStream.getExecutionEnvironment(),
-                  rawFlinkTransform) {}; // we have to cheat around the ctor being protected
-
-          keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
-
-        } else {
-          unionOutputStream = inputDataStream
-              .connect(transformedSideInputs.f1.broadcast())
-              .transform(transformName, outputUnionTypeInformation, doFnOperator);
-        }
-      }
-
-      SplitStream<RawUnionValue> splitStream = unionOutputStream
-              .split(new OutputSelector<RawUnionValue>() {
-                @Override
-                public Iterable<String> select(RawUnionValue value) {
-                  return Collections.singletonList(Integer.toString(value.getUnionTag()));
-                }
-              });
-
-      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
-        final int outputTag = tagsToLabels.get(output.getKey());
-
-        TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
-
-        @SuppressWarnings("unchecked")
-        DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
-          .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
-            @Override
-            public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
-              out.collect(value.getValue());
-            }
-          }).returns(outputTypeInfo);
-
-        context.setOutputDataStream(output.getValue(), unwrapped);
-      }
-    }
-
-    private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
-        TupleTag<?> mainTag,
-        Map<TupleTag<?>, PValue> allTaggedValues) {
-
-      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
-      int count = 0;
-      tagToLabelMap.put(mainTag, count++);
-      for (TupleTag<?> key : allTaggedValues.keySet()) {
-        if (!tagToLabelMap.containsKey(key)) {
-          tagToLabelMap.put(key, count++);
-        }
-      }
-      return tagToLabelMap;
-    }
-
-    private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) {
-      List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PValue taggedColl : taggedCollections.values()) {
-        checkArgument(
-            taggedColl instanceof PCollection,
-            "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
-            PCollection.class.getSimpleName(),
-            taggedColl.getClass().getSimpleName());
-        PCollection<?> coll = (PCollection<?>) taggedColl;
-        WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
-            WindowedValue.getFullCoder(
-                coll.getCoder(),
-                coll.getWindowingStrategy().getWindowFn().windowCoder());
-        outputCoders.add(windowedValueCoder);
-      }
-      return UnionCoder.of(outputCoders);
-    }
-  }
-
-  private static class ParDoStreamingTranslator<InputT, OutputT>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      ParDo.MultiOutput<InputT, OutputT>> {
-
-    @Override
-    public void translateNode(
-        ParDo.MultiOutput<InputT, OutputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      ParDoTranslationHelper.translateParDo(
-          transform.getName(),
-          transform.getFn(),
-          (PCollection<InputT>) context.getInput(transform),
-          transform.getSideInputs(),
-          context.getOutputs(transform),
-          transform.getMainOutputTag(),
-          transform.getAdditionalOutputTags().getAll(),
-          context,
-          new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
-            @Override
-            public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
-                DoFn<InputT, OutputT> doFn,
-                List<PCollectionView<?>> sideInputs,
-                TupleTag<OutputT> mainOutputTag,
-                List<TupleTag<?>> additionalOutputTags,
-                FlinkStreamingTranslationContext context,
-                WindowingStrategy<?, ?> windowingStrategy,
-                Map<TupleTag<?>, Integer> tagsToLabels,
-                Coder<WindowedValue<InputT>> inputCoder,
-                Coder keyCoder,
-                Map<Integer, PCollectionView<?>> transformedSideInputs) {
-              return new DoFnOperator<>(
-                  doFn,
-                  inputCoder,
-                  mainOutputTag,
-                  additionalOutputTags,
-                  new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
-                  windowingStrategy,
-                  transformedSideInputs,
-                  sideInputs,
-                  context.getPipelineOptions(),
-                  keyCoder);
-            }
-          });
-    }
-  }
-
-  private static class SplittableProcessElementsStreamingTranslator<
-      InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> {
-
-    @Override
-    public void translateNode(
-        SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      ParDoTranslationHelper.translateParDo(
-          transform.getName(),
-          transform.newProcessFn(transform.getFn()),
-          (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
-              context.getInput(transform),
-          transform.getSideInputs(),
-          context.getOutputs(transform),
-          transform.getMainOutputTag(),
-          transform.getAdditionalOutputTags().getAll(),
-          context,
-          new ParDoTranslationHelper.DoFnOperatorFactory<
-              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
-            @Override
-            public DoFnOperator<
-                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
-                OutputT,
-                RawUnionValue> createDoFnOperator(
-                    DoFn<
-                        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
-                        OutputT> doFn,
-                    List<PCollectionView<?>> sideInputs,
-                    TupleTag<OutputT> mainOutputTag,
-                    List<TupleTag<?>> additionalOutputTags,
-                    FlinkStreamingTranslationContext context,
-                    WindowingStrategy<?, ?> windowingStrategy,
-                    Map<TupleTag<?>, Integer> tagsToLabels,
-                    Coder<
-                        WindowedValue<
-                            KeyedWorkItem<
-                                String,
-                                ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
-                    Coder keyCoder,
-                    Map<Integer, PCollectionView<?>> transformedSideInputs) {
-              return new SplittableDoFnOperator<>(
-                  doFn,
-                  inputCoder,
-                  mainOutputTag,
-                  additionalOutputTags,
-                  new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
-                  windowingStrategy,
-                  transformedSideInputs,
-                  sideInputs,
-                  context.getPipelineOptions(),
-                  keyCoder);
-            }
-          });
-    }
-  }
-
-  private static class CreateViewStreamingTranslator<ElemT, ViewT>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
-
-    @Override
-    public void translateNode(
-        FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform,
-        FlinkStreamingTranslationContext context) {
-      // just forward
-      DataStream<WindowedValue<List<ElemT>>> inputDataSet =
-          context.getInputDataStream(context.getInput(transform));
-
-      PCollectionView<ViewT> view = context.getOutput(transform);
-
-      context.setOutputDataStream(view, inputDataSet);
-    }
-  }
-
-  private static class WindowAssignTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> {
-
-    @Override
-    public void translateNode(
-        Window.Assign<T> transform,
-        FlinkStreamingTranslationContext context) {
-
-      @SuppressWarnings("unchecked")
-      WindowingStrategy<T, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<T, BoundedWindow>)
-              context.getOutput(transform).getWindowingStrategy();
-
-      TypeInformation<WindowedValue<T>> typeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
-      DataStream<WindowedValue<T>> inputDataStream =
-          context.getInputDataStream(context.getInput(transform));
-
-      WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
-
-      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
-          new FlinkAssignWindows<>(windowFn);
-
-      SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream
-          .flatMap(assignWindowsFunction)
-          .name(context.getOutput(transform).getName())
-          .returns(typeInfo);
-
-      context.setOutputDataStream(context.getOutput(transform), outputDataStream);
-    }
-  }
-
-  private static class ReshuffleTranslatorStreaming<K, InputT>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> {
-
-    @Override
-    public void translateNode(
-        Reshuffle<K, InputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
-          context.getInputDataStream(context.getInput(transform));
-
-      context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());
-
-    }
-  }
-
-
-  private static class GroupByKeyTranslator<K, InputT>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
-
-    @Override
-    public void translateNode(
-        GroupByKey<K, InputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      PCollection<KV<K, InputT>> input = context.getInput(transform);
-
-      @SuppressWarnings("unchecked")
-      WindowingStrategy<?, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
-
-      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
-
-      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
-          inputKvCoder.getKeyCoder(),
-          inputKvCoder.getValueCoder(),
-          input.getWindowingStrategy().getWindowFn().windowCoder());
-
-      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
-      WindowedValue.
-          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
-          WindowedValue.getFullCoder(
-              workItemCoder,
-              input.getWindowingStrategy().getWindowFn().windowCoder());
-
-      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
-          new CoderTypeInformation<>(windowedWorkItemCoder);
-
-      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
-          inputDataStream
-              .flatMap(new ToKeyedWorkItem<K, InputT>())
-              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
-
-      KeyedStream<
-          WindowedValue<
-              SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
-          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
-
-      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
-          SystemReduceFn.buffering(inputKvCoder.getValueCoder());
-
-      TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
-      DoFnOperator.DefaultOutputManagerFactory<
-            WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
-          new DoFnOperator.DefaultOutputManagerFactory<>();
-
-      WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
-          new WindowDoFnOperator<>(
-              reduceFn,
-              (Coder) windowedWorkItemCoder,
-              new TupleTag<KV<K, Iterable<InputT>>>("main output"),
-              Collections.<TupleTag<?>>emptyList(),
-              outputManagerFactory,
-              windowingStrategy,
-              new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
-              Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-              context.getPipelineOptions(),
-              inputKvCoder.getKeyCoder());
-
-      // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
-      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
-      @SuppressWarnings("unchecked")
-      SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream =
-          keyedWorkItemStream
-              .transform(
-                  transform.getName(),
-                  outputTypeInfo,
-                  (OneInputStreamOperator) doFnOperator);
-
-      context.setOutputDataStream(context.getOutput(transform), outDataStream);
-
-    }
-  }
-
-  private static class CombinePerKeyTranslator<K, InputT, OutputT>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      Combine.PerKey<K, InputT, OutputT>> {
-
-    @Override
-    boolean canTranslate(
-        Combine.PerKey<K, InputT, OutputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      // if we have a merging window strategy and side inputs we cannot
-      // translate as a proper combine. We have to group and then run the combine
-      // over the final grouped values.
-      PCollection<KV<K, InputT>> input = context.getInput(transform);
-
-      @SuppressWarnings("unchecked")
-      WindowingStrategy<?, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
-
-      return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
-    }
-
-    @Override
-    public void translateNode(
-        Combine.PerKey<K, InputT, OutputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      PCollection<KV<K, InputT>> input = context.getInput(transform);
-
-      @SuppressWarnings("unchecked")
-      WindowingStrategy<?, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
-
-      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
-
-      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
-          inputKvCoder.getKeyCoder(),
-          inputKvCoder.getValueCoder(),
-          input.getWindowingStrategy().getWindowFn().windowCoder());
-
-      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
-      WindowedValue.
-          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
-            WindowedValue.getFullCoder(
-                workItemCoder,
-                input.getWindowingStrategy().getWindowFn().windowCoder());
-
-      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
-          new CoderTypeInformation<>(windowedWorkItemCoder);
-
-      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
-          inputDataStream
-              .flatMap(new ToKeyedWorkItem<K, InputT>())
-              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
-
-      KeyedStream<
-            WindowedValue<
-                SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
-          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
-
-      SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining(
-          inputKvCoder.getKeyCoder(),
-          AppliedCombineFn.withInputCoder(
-              transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
-
-      TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
-      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-
-      if (sideInputs.isEmpty()) {
-
-        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
-            new WindowDoFnOperator<>(
-                reduceFn,
-                (Coder) windowedWorkItemCoder,
-                new TupleTag<KV<K, OutputT>>("main output"),
-                Collections.<TupleTag<?>>emptyList(),
-                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
-                windowingStrategy,
-                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
-                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-                context.getPipelineOptions(),
-                inputKvCoder.getKeyCoder());
-
-        // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
-        // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
-        @SuppressWarnings("unchecked")
-        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
-            keyedWorkItemStream.transform(
-                transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
-
-        context.setOutputDataStream(context.getOutput(transform), outDataStream);
-      } else {
-        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
-            transformSideInputs(sideInputs, context);
-
-        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
-            new WindowDoFnOperator<>(
-                reduceFn,
-                (Coder) windowedWorkItemCoder,
-                new TupleTag<KV<K, OutputT>>("main output"),
-                Collections.<TupleTag<?>>emptyList(),
-                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
-                windowingStrategy,
-                transformSideInputs.f0,
-                sideInputs,
-                context.getPipelineOptions(),
-                inputKvCoder.getKeyCoder());
-
-        // we have to manually contruct the two-input transform because we're not
-        // allowed to have only one input keyed, normally.
-
-        TwoInputTransformation<
-            WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
-            RawUnionValue,
-            WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>(
-            keyedWorkItemStream.getTransformation(),
-            transformSideInputs.f1.broadcast().getTransformation(),
-            transform.getName(),
-            (TwoInputStreamOperator) doFnOperator,
-            outputTypeInfo,
-            keyedWorkItemStream.getParallelism());
-
-        rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
-        rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
-
-        @SuppressWarnings({ "unchecked", "rawtypes" })
-        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
-            new SingleOutputStreamOperator(
-                keyedWorkItemStream.getExecutionEnvironment(),
-                rawFlinkTransform) {}; // we have to cheat around the ctor being protected
-
-        keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
-
-        context.setOutputDataStream(context.getOutput(transform), outDataStream);
-      }
-    }
-  }
-
-  private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
-
-    @Override
-    boolean canTranslate(
-        SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
-        FlinkStreamingTranslationContext context) {
-      return true;
-    }
-
-    @Override
-    public void translateNode(
-        SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      PCollection<KV<K, InputT>> input = context.getInput(transform);
-
-      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
-
-      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
-          inputKvCoder.getKeyCoder(),
-          inputKvCoder.getValueCoder(),
-          input.getWindowingStrategy().getWindowFn().windowCoder());
-
-
-      WindowedValue.
-          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
-          WindowedValue.getFullCoder(
-              workItemCoder,
-              input.getWindowingStrategy().getWindowFn().windowCoder());
-
-      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
-          new CoderTypeInformation<>(windowedWorkItemCoder);
-
-      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
-
-      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
-          inputDataStream
-              .flatMap(new ToKeyedWorkItem<K, InputT>())
-              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
-
-      KeyedStream<
-          WindowedValue<
-              SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
-          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
-
-      context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream);
-    }
-  }
-
-  private static class FlattenPCollectionTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      Flatten.PCollections<T>> {
-
-    @Override
-    public void translateNode(
-        Flatten.PCollections<T> transform,
-        FlinkStreamingTranslationContext context) {
-      Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
-
-      if (allInputs.isEmpty()) {
-
-        // create an empty dummy source to satisfy downstream operations
-        // we cannot create an empty source in Flink, therefore we have to
-        // add the flatMap that simply never forwards the single element
-        DataStreamSource<String> dummySource =
-            context.getExecutionEnvironment().fromElements("dummy");
-
-        DataStream<WindowedValue<T>> result = dummySource.flatMap(
-            new FlatMapFunction<String, WindowedValue<T>>() {
-              @Override
-              public void flatMap(
-                  String s,
-                  Collector<WindowedValue<T>> collector) throws Exception {
-                // never return anything
-              }
-            }).returns(
-            new CoderTypeInformation<>(
-                WindowedValue.getFullCoder(
-                    (Coder<T>) VoidCoder.of(),
-                    GlobalWindow.Coder.INSTANCE)));
-        context.setOutputDataStream(context.getOutput(transform), result);
-
-      } else {
-        DataStream<T> result = null;
-        for (PValue input : allInputs.values()) {
-          DataStream<T> current = context.getInputDataStream(input);
-          result = (result == null) ? current : result.union(current);
-        }
-        context.setOutputDataStream(context.getOutput(transform), result);
-      }
-    }
-  }
-
-  private static class ToKeyedWorkItem<K, InputT>
-      extends RichFlatMapFunction<
-      WindowedValue<KV<K, InputT>>,
-      WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
-
-    @Override
-    public void flatMap(
-        WindowedValue<KV<K, InputT>> inWithMultipleWindows,
-        Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
-
-      // we need to wrap each one work item per window for now
-      // since otherwise the PushbackSideInputRunner will not correctly
-      // determine whether side inputs are ready
-      //
-      // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
-      for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
-        SingletonKeyedWorkItem<K, InputT> workItem =
-            new SingletonKeyedWorkItem<>(
-                in.getValue().getKey(),
-                in.withValue(in.getValue().getValue()));
-
-        out.collect(in.withValue(workItem));
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
deleted file mode 100644
index 1a943a3..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Helper for keeping track of which {@link DataStream DataStreams} map
- * to which {@link PTransform PTransforms}.
- */
-class FlinkStreamingTranslationContext {
-
-  private final StreamExecutionEnvironment env;
-  private final PipelineOptions options;
-
-  /**
-   * Keeps a mapping between the output value of the PTransform (in Dataflow) and the
-   * Flink Operator that produced it, after the translation of the correspondinf PTransform
-   * to its Flink equivalent.
-   * */
-  private final Map<PValue, DataStream<?>> dataStreams;
-
-  private AppliedPTransform<?, ?, ?> currentTransform;
-
-  public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
-    this.env = checkNotNull(env);
-    this.options = checkNotNull(options);
-    this.dataStreams = new HashMap<>();
-  }
-
-  public StreamExecutionEnvironment getExecutionEnvironment() {
-    return env;
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataStream<T> getInputDataStream(PValue value) {
-    return (DataStream<T>) dataStreams.get(value);
-  }
-
-  public void setOutputDataStream(PValue value, DataStream<?> set) {
-    if (!dataStreams.containsKey(value)) {
-      dataStreams.put(value, set);
-    }
-  }
-
-  /**
-   * Sets the AppliedPTransform which carries input/output.
-   * @param currentTransform
-   */
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
-    this.currentTransform = currentTransform;
-  }
-
-  public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) {
-    Coder<T> valueCoder = collection.getCoder();
-
-    return WindowedValue.getFullCoder(
-        valueCoder,
-        collection.getWindowingStrategy().getWindowFn().windowCoder());
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
-    Coder<T> valueCoder = collection.getCoder();
-    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
-        WindowedValue.getFullCoder(
-            valueCoder,
-            collection.getWindowingStrategy().getWindowFn().windowCoder());
-
-    return new CoderTypeInformation<>(windowedValueCoder);
-  }
-
-
-  @SuppressWarnings("unchecked")
-  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
-  }
-
-  public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) {
-    return currentTransform.getInputs();
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T extends PValue> T getOutput(PTransform<?, T> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
-  }
-
-  public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(
-      PTransform<?, OutputT> transform) {
-    return currentTransform.getOutputs();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
deleted file mode 100644
index f955f2a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Flink streaming overrides for various view (side input) transforms.
- */
-class FlinkStreamingViewOverrides {
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Flink runner in streaming mode.
-   */
-  static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-    private final transient FlinkRunner runner;
-
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * View.AsMultimap View.AsMultimap} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-    private final transient FlinkRunner runner;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link View.AsList View.AsList} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
-
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link View.AsIterable View.AsIterable} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Specialized expansion for
-   * {@link View.AsSingleton View.AsSingleton} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private View.AsSingleton<T> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-  }
-
-  static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingCombineGloballyAsSingletonView(
-        FlinkRunner runner,
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined,
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Collections.singletonList(c.element()));
-    }
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<T>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
-  /**
-   * Creates a primitive {@link PCollectionView}.
-   *
-   * <p>For internal use only by runner implementors.
-   *
-   * @param <ElemT> The type of the elements of the input PCollection
-   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-   */
-  public static class CreateFlinkPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-    private PCollectionView<ViewT> view;
-
-    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
-      this.view = view;
-    }
-
-    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
-        PCollectionView<ViewT> view) {
-      return new CreateFlinkPCollectionView<>(view);
-    }
-
-    @Override
-    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-      return view;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
deleted file mode 100644
index 3acc3ea..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline.
- */
-class PipelineTranslationOptimizer extends FlinkPipelineTranslator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class);
-
-  private TranslationMode translationMode;
-
-  private final FlinkPipelineOptions options;
-
-  public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) {
-    this.translationMode = defaultMode;
-    this.options = options;
-  }
-
-  public TranslationMode getTranslationMode() {
-
-    // override user-specified translation mode
-    if (options.isStreaming()) {
-      return TranslationMode.STREAMING;
-    }
-
-    return translationMode;
-  }
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {}
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    Class<? extends PTransform> transformClass = node.getTransform().getClass();
-    if (transformClass == Read.Unbounded.class) {
-      LOG.info("Found {}. Switching to streaming execution.", transformClass);
-      translationMode = TranslationMode.STREAMING;
-    }
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformHierarchy.Node producer) {}
-}